vyre-conform 0.1.0

Conformance suite for vyre backends — proves byte-identical output to CPU reference
Documentation
//! Regression persistence and checkpoint serialization.

use crate::spec::types::ParityFailure;
use crate::verify::regression;
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::SyncSender;
use std::time::{SystemTime, UNIX_EPOCH};

#[inline]
pub(crate) fn start_regression_writer() -> (SyncSender<ParityFailure>, std::thread::JoinHandle<()>)
{
    let (tx, rx) = std::sync::mpsc::sync_channel(1024);

    let writer = std::thread::spawn(move || {
        let mut batch = Vec::new();
        let mut seen = HashMap::new();

        while let Ok(failure) = rx.recv() {
            batch.push(failure);
            if batch.len() >= 64 {
                flush_regression_batch(&mut batch, &mut seen);
            }
        }

        flush_regression_batch(&mut batch, &mut seen);
    });

    (tx, writer)
}

#[inline]
pub(crate) fn send_or_store_failure(
    tx: &Option<SyncSender<ParityFailure>>,
    failure: &ParityFailure,
) {
    if let Some(sender) = tx {
        let _ = sender.send(failure.clone());
        return;
    }

    if let Err(error) = regression::save_binary(failure) {
        eprintln!(
            "vyre-conform: could not persist streaming regression for {}: {error}. Fix: ensure regressions/ is writable.",
            failure.op_id
        );
    }
}

fn sanitize_id(id: &str) -> String {
    id.chars()
        .map(|c| {
            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
                c
            } else {
                '%'
            }
        })
        .collect()
}

static REGRESSION_COUNTER: AtomicU64 = AtomicU64::new(0);

fn flush_regression_batch(
    batch: &mut Vec<ParityFailure>,
    seen: &mut HashMap<String, HashSet<Vec<u8>>>,
) {
    let mut synced = HashSet::new();

    for failure in batch.drain(..) {
        let set = seen.entry(failure.op_id.clone()).or_default();
        if !set.insert(failure.input.clone()) {
            continue;
        }

        let dir = PathBuf::from(std::env!("CARGO_MANIFEST_DIR"))
            .join("regressions")
            .join(sanitize_id(&failure.op_id));
        if std::fs::create_dir_all(&dir).is_err() {
            continue;
        }

        let name = format!(
            "stream-{:016x}.bin",
            REGRESSION_COUNTER.fetch_add(1, Ordering::Relaxed)
        );
        let path = dir.join(&name);
        let tmp = unique_temp_path(&path);

        if let Ok(mut output) = std::fs::OpenOptions::new()
            .write(true)
            .create(true)
            .truncate(true)
            .open(&tmp)
        {
            if output.write_all(&failure.input).is_ok() && std::fs::rename(&tmp, &path).is_ok() {
                synced.insert(dir);
            }
        }
    }

    for dir in synced {
        if let Ok(file) = std::fs::File::open(&dir) {
            let _ = file.sync_all();
        }
    }
}

#[derive(serde::Serialize)]
struct Checkpoint {
    next_test_id: u64,
    shard_id: u64,
    shard_count: u64,
}

#[inline]
pub(crate) fn write_checkpoint(
    shard_id: u64,
    next_test_id: u64,
    shard_count: u64,
) -> std::io::Result<()> {
    let path = PathBuf::from(std::env!("CARGO_MANIFEST_DIR"))
        .join("regressions")
        .join(format!("streaming-progress-shard{shard_id}.json"));
    std::fs::create_dir_all(
        path.parent().ok_or_else(|| {
            std::io::Error::other("Fix: checkpoint path has no parent directory.")
        })?,
    )?;
    let tmp = unique_temp_path(&path);
    let bytes = serde_json::to_vec(&Checkpoint {
        next_test_id,
        shard_id,
        shard_count,
    })?;

    let mut file = std::fs::OpenOptions::new()
        .write(true)
        .create(true)
        .truncate(true)
        .open(&tmp)?;
    file.write_all(&bytes)?;
    file.sync_all()?;
    std::fs::rename(&tmp, &path)?;
    Ok(())
}

fn unique_temp_path(path: &Path) -> PathBuf {
    let pid = std::process::id();
    let thread_id = format!("{:?}", std::thread::current().id())
        .chars()
        .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' })
        .collect::<String>();
    let nanos = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map_or(0, |duration| duration.as_nanos());
    let counter = REGRESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
    let file_name = path
        .file_name()
        .and_then(|name| name.to_str())
        .unwrap_or("streaming-regression");
    path.with_file_name(format!(
        "{file_name}.tmp.{pid}.{thread_id}.{nanos}.{counter}"
    ))
}