use crate::spec::types::ParityFailure;
use crate::verify::regression;
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::SyncSender;
use std::time::{SystemTime, UNIX_EPOCH};
#[inline]
pub(crate) fn start_regression_writer() -> (SyncSender<ParityFailure>, std::thread::JoinHandle<()>)
{
let (tx, rx) = std::sync::mpsc::sync_channel(1024);
let writer = std::thread::spawn(move || {
let mut batch = Vec::new();
let mut seen = HashMap::new();
while let Ok(failure) = rx.recv() {
batch.push(failure);
if batch.len() >= 64 {
flush_regression_batch(&mut batch, &mut seen);
}
}
flush_regression_batch(&mut batch, &mut seen);
});
(tx, writer)
}
#[inline]
pub(crate) fn send_or_store_failure(
tx: &Option<SyncSender<ParityFailure>>,
failure: &ParityFailure,
) {
if let Some(sender) = tx {
let _ = sender.send(failure.clone());
return;
}
if let Err(error) = regression::save_binary(failure) {
eprintln!(
"vyre-conform: could not persist streaming regression for {}: {error}. Fix: ensure regressions/ is writable.",
failure.op_id
);
}
}
fn sanitize_id(id: &str) -> String {
id.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'%'
}
})
.collect()
}
static REGRESSION_COUNTER: AtomicU64 = AtomicU64::new(0);
fn flush_regression_batch(
batch: &mut Vec<ParityFailure>,
seen: &mut HashMap<String, HashSet<Vec<u8>>>,
) {
let mut synced = HashSet::new();
for failure in batch.drain(..) {
let set = seen.entry(failure.op_id.clone()).or_default();
if !set.insert(failure.input.clone()) {
continue;
}
let dir = PathBuf::from(std::env!("CARGO_MANIFEST_DIR"))
.join("regressions")
.join(sanitize_id(&failure.op_id));
if std::fs::create_dir_all(&dir).is_err() {
continue;
}
let name = format!(
"stream-{:016x}.bin",
REGRESSION_COUNTER.fetch_add(1, Ordering::Relaxed)
);
let path = dir.join(&name);
let tmp = unique_temp_path(&path);
if let Ok(mut output) = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp)
{
if output.write_all(&failure.input).is_ok() && std::fs::rename(&tmp, &path).is_ok() {
synced.insert(dir);
}
}
}
for dir in synced {
if let Ok(file) = std::fs::File::open(&dir) {
let _ = file.sync_all();
}
}
}
#[derive(serde::Serialize)]
struct Checkpoint {
next_test_id: u64,
shard_id: u64,
shard_count: u64,
}
#[inline]
pub(crate) fn write_checkpoint(
shard_id: u64,
next_test_id: u64,
shard_count: u64,
) -> std::io::Result<()> {
let path = PathBuf::from(std::env!("CARGO_MANIFEST_DIR"))
.join("regressions")
.join(format!("streaming-progress-shard{shard_id}.json"));
std::fs::create_dir_all(
path.parent().ok_or_else(|| {
std::io::Error::other("Fix: checkpoint path has no parent directory.")
})?,
)?;
let tmp = unique_temp_path(&path);
let bytes = serde_json::to_vec(&Checkpoint {
next_test_id,
shard_id,
shard_count,
})?;
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp)?;
file.write_all(&bytes)?;
file.sync_all()?;
std::fs::rename(&tmp, &path)?;
Ok(())
}
fn unique_temp_path(path: &Path) -> PathBuf {
let pid = std::process::id();
let thread_id = format!("{:?}", std::thread::current().id())
.chars()
.map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' })
.collect::<String>();
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_nanos());
let counter = REGRESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
let file_name = path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("streaming-regression");
path.with_file_name(format!(
"{file_name}.tmp.{pid}.{thread_id}.{nanos}.{counter}"
))
}