use crate::graphs::build_graphs_html;
use crate::process_watcher::ProcessPeakMem;
use crate::snapshot::{Comparison, Diff, LogInfo, RenderContext, Run, RunInfo, Versions};
use crate::system_monitor::{ActivityCounters, MonitorTimeRange, SystemMonitor, TraceEvent};
use crate::trace_downloader::TraceFile;
use crate::{TraceExtraArgs, collect_traces_for_path, trace_downloader};
use anyhow::{Context, Result, bail};
use clap::Args;
use gpu_trace_perf::traces_config;
use log::{debug, error, info, warn};
use sha2::{Digest, Sha256};
use std::fs::File;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Args)]
#[command(name = "replay")]
pub struct Replay {
#[arg(long)]
config: PathBuf,
#[arg(long)]
device: String,
#[arg(long = "output")]
pub output: PathBuf,
#[arg(short, long, default_value = "0")]
jobs: usize,
#[arg(long)]
jwt: Option<PathBuf>,
#[arg(long)]
traces_db: Option<PathBuf>,
#[arg(long)]
cache_dir: Option<PathBuf>,
#[arg(long, default_value = "5368709120")]
disk_limit: u64,
#[arg(long)]
download_caching_proxy: Option<String>,
#[arg(long)]
snapshots_url: Option<String>,
#[arg(long)]
job_url: Option<String>,
#[arg(long)]
snapshot_url_must_work: bool,
#[arg(long)]
no_upload: bool,
#[arg(long = "timeout", default_value = "60")]
timeout_secs: u64,
#[arg(long = "artifact-url")]
artifact_url: Option<String>,
#[arg(long, default_value = "1")]
fraction: usize,
#[arg(long, default_value = "1")]
fraction_start: usize,
}
struct CiReplayResult {
trace_path: String,
expected_checksum: String,
actual_checksum: String,
actual_image_url: String,
expected_image_url: String,
diff_fraction: f32,
actual_preview: String,
expected_preview: String,
diff_image: String,
diff_image_preview: String,
log_path: String,
execution_time: f32,
log_text: String,
flaky_frames: Vec<String>,
upload_error: Option<String>,
peak: ProcessPeakMem,
}
struct SnapshotOutput {
trace_path: String,
actual_checksum: String,
png_html_path: String,
actual_png_path: Option<PathBuf>,
actual_preview: String,
log_path: String,
execution_time: f32,
log_text: String,
flaky_frames: Vec<String>,
png_bytes: Vec<u8>,
peak: ProcessPeakMem,
png_image: image::DynamicImage,
}
impl SnapshotOutput {
async fn upload_to(
&self,
base_url: &str,
state: &ReplayState,
expires: Option<Duration>,
) -> Result<String> {
state
.cache
.upload_if_absent(base_url, &self.actual_checksum, &self.png_bytes, expires)
.await
}
async fn upload(&self, state: &ReplayState, expected_checksum: &str) -> Result<Option<String>> {
let snapshots: Option<(&String, Option<Duration>)> =
state.snapshots_url.as_ref().map(|url| (url, None));
let job = state
.job_url
.as_ref()
.map(|url| (url, Some(std::time::Duration::from_secs(4 * 7 * 24 * 3600))));
if self.actual_checksum.is_empty() {
return Ok(None);
}
if state.no_upload {
return Ok(None);
}
let Some(snapshots) = snapshots else {
return Ok(None);
};
let passed = self.actual_checksum == expected_checksum;
let (mut upload_url, mut expires) = if passed {
snapshots
} else {
job.unwrap_or(snapshots)
};
let mut url = self.upload_to(upload_url, state, expires).await;
if let (Err(e), Some(job)) = (&url, job) {
if passed
&& !state.snapshot_url_must_work
&& e.downcast_ref::<trace_downloader::UploadForbidden>()
.is_some()
{
debug!(
"HTTP 403 uploading {} to snapshots_url, falling back to job_url",
self.actual_checksum
);
(upload_url, expires) = job;
url = self.upload_to(upload_url, state, expires).await;
}
}
let url = match url {
Ok(url) => url,
Err(e) if !state.snapshot_url_must_work => {
warn!(
"Failed to upload {} to {upload_url}: {e:#}",
self.actual_checksum
);
return Ok(None);
}
Err(e) => return Err(e),
};
if let Some(ref path) = self.actual_png_path {
if let Err(e) = std::fs::remove_file(path) {
warn!("Failed to remove local PNG after upload: {e}");
}
}
Ok(Some(url))
}
}
struct AnalyzedSnapshot {
checksum: String,
bytes: Vec<u8>,
image: image::DynamicImage,
path: PathBuf,
flaky_frames: Vec<String>,
}
fn analyze_snapshot(
result: &crate::snapshot::SnapshotResult,
trace_output_path: &Path,
expected_checksum: &str,
) -> Result<AnalyzedSnapshot> {
if result.files.is_empty() {
anyhow::bail!("snapshot produced no PNG files");
}
struct Frame {
name: String,
path: PathBuf,
checksum: String,
bytes: Vec<u8>,
image: image::DynamicImage,
}
let mut frames: Vec<Frame> = Vec::new();
for file in &result.files {
let png_path = trace_output_path.join(file);
let file_bytes = std::fs::read(&png_path)
.with_context(|| format!("reading snapshot PNG {}", png_path.display()))?;
let decoded = image::load_from_memory(&file_bytes)
.with_context(|| format!("decoding snapshot PNG {}", png_path.display()))?;
let checksum = format!(
"{:x}",
Sha256::new().chain_update(decoded.as_bytes()).finalize()
);
frames.push(Frame {
name: file.to_string_lossy().to_string(),
path: png_path,
checksum,
bytes: file_bytes,
image: decoded,
});
}
let n_differing = frames
.iter()
.filter(|f| f.checksum != expected_checksum)
.count();
let flaky_frames: Vec<String> = if n_differing > 0 && n_differing < frames.len() {
frames
.iter()
.filter(|f| f.checksum != expected_checksum)
.map(|f| f.name.clone())
.collect()
} else {
vec![]
};
let chosen_idx = frames
.iter()
.position(|f| f.checksum != expected_checksum)
.unwrap_or(0);
let mut kept_checksums: std::collections::HashSet<String> = std::collections::HashSet::new();
kept_checksums.insert(frames[chosen_idx].checksum.clone());
for (idx, frame) in frames.iter().enumerate() {
if idx == chosen_idx {
continue;
}
if frame.checksum == expected_checksum || kept_checksums.contains(&frame.checksum) {
if let Err(e) = std::fs::remove_file(&frame.path) {
warn!("Failed to remove redundant frame {}: {e}", frame.name);
}
} else {
kept_checksums.insert(frame.checksum.clone());
}
}
let chosen = frames.remove(chosen_idx);
Ok(AnalyzedSnapshot {
checksum: chosen.checksum,
bytes: chosen.bytes,
image: chosen.image,
path: chosen.path,
flaky_frames,
})
}
fn run_snapshot_for_trace(
trace_file: TraceFile,
job: TraceJob,
cache_dir: &Path,
output_dir: &str,
timeout: Duration,
) -> Result<SnapshotOutput> {
let TraceJob {
path: trace_path,
expected_checksum,
singlethread: _,
nonloopable,
replay_args,
} = job;
let local_path = trace_file.path();
let extra = TraceExtraArgs {
angle: replay_args.clone(),
apitrace: replay_args.clone(),
gfxreconstruct: replay_args,
};
let tool = collect_traces_for_path(cache_dir, local_path, false, nonloopable, &extra);
if tool.is_empty() {
bail!("Failed to parse {:?} as a trace test.\n", local_path);
} else if tool.len() > 1 {
bail!("Found a directory of traces at {:?}.\n", local_path);
}
let tool = &tool[0];
if !tool.can_snapshot() {
anyhow::bail!(
"trace type does not support snapshotting: {}",
local_path.display()
);
}
info!("Running {}", tool.name());
let snapshot = tool
.snapshot(output_dir, 5, timeout)
.with_context(|| format!("snapshotting {trace_path}"))?;
debug!("Finished {}", tool.name());
let trace_output_path = Path::new(output_dir).join(tool.output_subdir());
let log_abs = trace_output_path.join("log.txt");
std::fs::write(&log_abs, snapshot.log())
.with_context(|| format!("writing log for {trace_path}"))?;
let log_path = crate::snapshot::html_filename(&log_abs, Path::new(output_dir))
.with_context(|| format!("computing log html path for {trace_path}"))?;
if snapshot.files.is_empty() {
return Ok(SnapshotOutput {
trace_path,
actual_checksum: "".to_string(),
png_html_path: "".to_string(),
actual_png_path: None,
actual_preview: "".to_string(),
log_path,
execution_time: snapshot.runtime.as_secs_f32(),
log_text: snapshot.log(),
flaky_frames: vec![],
png_bytes: vec![],
png_image: image::DynamicImage::new_rgba8(0, 0),
peak: snapshot.output.peak,
});
}
let analyzed = analyze_snapshot(&snapshot, &trace_output_path, &expected_checksum)
.with_context(|| format!("computing checksum for {trace_path}"))?;
let first_png_html_path = crate::snapshot::html_filename(&analyzed.path, Path::new(output_dir))
.with_context(|| format!("computing actual PNG html path for {trace_path}"))?;
let actual_preview_abs = crate::snapshot::generate_preview(&analyzed.image, &analyzed.path)
.with_context(|| format!("generating actual preview for {trace_path}"))?;
let actual_preview = crate::snapshot::html_filename(&actual_preview_abs, Path::new(output_dir))
.with_context(|| format!("computing actual preview html path for {trace_path}"))?;
Ok(SnapshotOutput {
trace_path,
actual_checksum: analyzed.checksum,
png_html_path: first_png_html_path,
actual_png_path: Some(analyzed.path),
actual_preview,
log_path,
execution_time: snapshot.runtime.as_secs_f32(),
log_text: snapshot.log(),
flaky_frames: analyzed.flaky_frames,
png_bytes: analyzed.bytes,
png_image: analyzed.image,
peak: snapshot.output.peak,
})
}
async fn fill_diff_info(
ci_result: &mut CiReplayResult,
first_png_image: image::DynamicImage,
out_dir: &str,
cache: &trace_downloader::TraceDownloader,
) {
if ci_result.actual_checksum == ci_result.expected_checksum {
ci_result.diff_fraction = 0.0;
return;
}
if !ci_result.expected_image_url.starts_with("http") {
return;
}
let trace_subdir = Path::new(&ci_result.log_path)
.parent()
.unwrap_or(Path::new(""));
let trace_output_path = Path::new(out_dir).join(trace_subdir);
let out_path = Path::new(out_dir);
let expected_bytes = match cache.fetch_bytes(&ci_result.expected_image_url).await {
Ok(b) => b,
Err(e) => {
warn!(
"Failed to fetch expected image for {}: {e:#}",
ci_result.trace_path
);
return;
}
};
let expected_image = match image::load_from_memory(&expected_bytes) {
Ok(img) => img,
Err(e) => {
warn!(
"Failed to decode expected image for {}: {e:#}",
ci_result.trace_path
);
return;
}
};
ci_result.diff_fraction =
crate::snapshot::image_diff_fraction(&expected_image, &first_png_image);
let diff_image_data = match image_diff::diff(&expected_image, &first_png_image) {
Ok(img) => img,
Err(e) => {
warn!(
"Failed to compute diff image for {}: {e:#}",
ci_result.trace_path
);
return;
}
};
let expected_png_path = trace_output_path.join("snapshot-0000-expected.png");
if let Err(e) = expected_image.save(&expected_png_path) {
warn!(
"Failed to save expected image for {}: {e:#}",
ci_result.trace_path
);
return;
}
let diff_png_path = trace_output_path.join("snapshot-0000-diff.png");
if let Err(e) = diff_image_data.save(&diff_png_path) {
warn!(
"Failed to save diff image for {}: {e:#}",
ci_result.trace_path
);
return;
}
match crate::snapshot::generate_preview(&expected_image, &expected_png_path) {
Ok(p) => match crate::snapshot::html_filename(&p, out_path) {
Ok(s) => ci_result.expected_preview = s,
Err(e) => warn!("Failed to get expected preview html path: {e:#}"),
},
Err(e) => warn!(
"Failed to generate expected preview for {}: {e:#}",
ci_result.trace_path
),
}
match crate::snapshot::generate_preview(&diff_image_data, &diff_png_path) {
Ok(p) => match crate::snapshot::html_filename(&p, out_path) {
Ok(s) => ci_result.diff_image_preview = s,
Err(e) => warn!("Failed to get diff preview html path: {e:#}"),
},
Err(e) => warn!(
"Failed to generate diff preview for {}: {e:#}",
ci_result.trace_path
),
}
match crate::snapshot::html_filename(&diff_png_path, out_path) {
Ok(s) => ci_result.diff_image = s,
Err(e) => warn!("Failed to get diff image html path: {e:#}"),
}
}
fn build_ci_replay_html(
results: &[&CiReplayResult],
device: &str,
patch_url: String,
graphs_url: String,
) -> Result<String> {
let mut versions_b = Versions::new();
let mut diffs: Vec<Diff> = results
.iter()
.map(|r| {
versions_b.add(LogInfo::from(r.log_text.as_str()));
Diff {
trace: r.trace_path.clone(),
frame: String::new(),
run_a: Run {
image: r.expected_image_url.clone(),
image_preview: r.expected_preview.clone(),
log: String::new(),
comment: "Baseline".to_string(),
execution_time: 0.0,
},
run_b: Run {
image: r.actual_image_url.clone(),
image_preview: r.actual_preview.clone(),
log: r.log_path.clone(),
comment: String::new(),
execution_time: r.execution_time,
},
diff: r.diff_fraction,
diff_image: r.diff_image.clone(),
diff_image_preview: r.diff_image_preview.clone(),
flaky_frames: r.flaky_frames.clone(),
peak_vram_mb: r.peak.vram_bytes.map(|b| (b / (1024 * 1024)) as f64),
peak_sys_mem_mb: r.peak.sys_bytes.map(|b| (b / (1024 * 1024)) as f64),
peak_rss_mb: r.peak.rss_bytes.map(|b| (b / (1024 * 1024)) as f64),
}
})
.collect();
diffs.sort_by(|a, b| b.diff.total_cmp(&a.diff).then(a.trace.cmp(&b.trace)));
let max_diff = diffs.iter().map(|d| d.diff).fold(0.0f32, f32::max);
let render_context = RenderContext {
comparison: Comparison {
max_diff,
run_a: RunInfo::new("Baseline".to_string(), Versions::new()),
run_b: RunInfo::new(format!("Device: {device}"), versions_b),
},
diffs,
patch_url,
graphs_url,
};
crate::snapshot::render(&render_context)
}
fn generate_checksums_patch<R: Read>(
file: &mut R,
filename: &str,
device: &str,
failing: &[(String, String)], ) -> Result<String> {
let mut old = String::new();
file.read_to_string(&mut old)
.context("reading config for patch generation")?;
let mut doc = old
.parse::<toml_edit::DocumentMut>()
.context("parsing config TOML for patch")?;
let mut failing_map: std::collections::HashMap<&str, &str> = failing
.iter()
.map(|(path, actual)| (path.as_str(), actual.as_str()))
.collect();
let traces = doc
.get_mut("traces")
.with_context(|| format!("getting traces table from {filename}"))?
.as_array_of_tables_mut()
.context("treating traces table as an array of tables")?;
for trace_table in traces.iter_mut() {
let path = trace_table
.get("path")
.context("Getting path in a trace")?
.as_str()
.context("Getting path in a trace as a string")?
.to_string();
let Some(actual_checksum) = failing_map.remove(path.as_str()) else {
continue;
};
let devices = trace_table
.get_mut("devices")
.with_context(|| format!("Getting devices table for {path}"))?;
let device_entry = if let Some(devices) = devices.as_inline_table_mut() {
let (_, device_entry) = devices
.get_key_value_mut(device)
.with_context(|| format!("Getting device {device} for {path}"))?;
device_entry
} else if let Some(devices) = devices.as_table_mut() {
devices
.get_mut(device)
.with_context(|| format!("Getting device {device} for {path}"))?
} else {
bail!("Failed to process devices as table or inline table");
};
let checksum = if let Some(device_entry) = device_entry.as_inline_table_mut() {
let (_, checksum) = device_entry
.get_key_value_mut("checksum")
.with_context(|| format!("Getting checksum for {path}/{device}"))?;
checksum
} else if let Some(device_entry) = device_entry.as_table_mut() {
device_entry
.get_mut("checksum")
.with_context(|| format!("Getting checksum for {path}/{device}"))?
} else {
bail!("Failed to process device as inline table");
};
*checksum = toml_edit::value(actual_checksum);
}
if !failing_map.is_empty() {
bail!(
"Failed to process diff for {:?}",
failing_map.iter().take(5)
);
}
let new = doc.to_string();
let text_diff = similar::TextDiff::from_lines(&old, &new);
Ok(text_diff
.unified_diff()
.context_radius(5)
.header(&format!("a/{filename}"), &format!("b/{filename}"))
.to_string())
}
#[derive(Clone)]
struct TraceJob {
path: String,
expected_checksum: String,
singlethread: bool,
nonloopable: bool,
replay_args: Vec<String>,
}
impl TraceJob {
async fn download(&self, state: &mut ReplayState) -> Result<(TraceFile, MonitorTimeRange)> {
let _dl_sem = state.dl_sem.clone().acquire_owned().await.unwrap();
let start = state.monitor_start.elapsed();
state
.activity
.active_downloads
.fetch_add(1, Ordering::Relaxed);
let trace_file = state.cache.get(&state.download_url, &self.path).await?;
state
.activity
.active_downloads
.fetch_sub(1, Ordering::Relaxed);
let end = state.monitor_start.elapsed();
Ok((trace_file, MonitorTimeRange::new(start, end)))
}
async fn replay(
&self,
state: &mut ReplayState,
trace_file: TraceFile,
) -> Result<(SnapshotOutput, MonitorTimeRange)> {
let permits = if self.singlethread {
state.replay_jobs as u32
} else {
1
};
let _replay_sem = state
.replay_sem
.clone()
.acquire_many_owned(permits)
.await
.unwrap();
let start = state.monitor_start.elapsed();
state
.activity
.active_replays
.fetch_add(1, Ordering::Relaxed);
let output = tokio::task::spawn_blocking({
let out_dir = state.output_dir.clone();
let trace_root = state.cache.root_for_file(trace_file.path()).to_owned();
let job = self.clone();
let timeout = state.timeout;
move || run_snapshot_for_trace(trace_file, job, &trace_root, &out_dir, timeout)
})
.await
.context("replay task panicked")??;
state
.activity
.active_replays
.fetch_sub(1, Ordering::Relaxed);
Ok((
output,
MonitorTimeRange::new(start, state.monitor_start.elapsed()),
))
}
}
#[derive(Clone)]
struct ReplayState {
cache: std::sync::Arc<trace_downloader::TraceDownloader>,
dl_sem: std::sync::Arc<tokio::sync::Semaphore>,
replay_sem: std::sync::Arc<tokio::sync::Semaphore>,
replay_jobs: usize,
download_url: String,
snapshots_url: Option<String>,
job_url: Option<String>,
snapshot_url_must_work: bool,
no_upload: bool,
output_dir: String,
timeout: Duration,
activity: Arc<ActivityCounters>,
monitor_start: std::time::Instant,
trace_events: Arc<Mutex<Vec<TraceEvent>>>,
}
async fn replay_one_trace(state: ReplayState, job: TraceJob) -> Result<CiReplayResult> {
let mut state = state;
let (trace_file, download_time) = job.download(&mut state).await?;
let snapshot = match job.replay(&mut state, trace_file).await {
Ok((snapshot, replay_time)) => {
state.trace_events.lock().unwrap().push(TraceEvent {
trace_name: job.path.clone(),
download_start_ms: download_time.start,
download_end_ms: download_time.end,
replay_start_ms: replay_time.start,
replay_end_ms: replay_time.end,
passed: snapshot.actual_checksum == job.expected_checksum,
});
snapshot
}
Err(replay_err) => {
if state
.cache
.invalidate_if_corrupted(&state.download_url, &job.path)
.await
.unwrap_or_else(|e| {
log::warn!("Failed to check cache for {}: {e:#}", &job.path);
false
})
{
log::info!("{}: retrying replay after cache invalidation", &job.path);
let (trace_file, download_time) = job.download(&mut state).await?;
let (snapshot, replay_time) = job.replay(&mut state, trace_file).await?;
state.trace_events.lock().unwrap().push(TraceEvent {
trace_name: job.path.clone(),
download_start_ms: download_time.start,
download_end_ms: download_time.end,
replay_start_ms: replay_time.start,
replay_end_ms: replay_time.end,
passed: snapshot.actual_checksum == job.expected_checksum,
});
snapshot
} else {
return Err(replay_err);
}
}
};
let passed = snapshot.actual_checksum == job.expected_checksum;
let guess_expected_url = if let Some(snapshots) = &state.snapshots_url {
format!(
"{}/{}.png",
snapshots.trim_end_matches('/'),
job.expected_checksum
)
} else {
String::new()
};
let (actual_image_url, expected_image_url, upload_error) =
match snapshot.upload(&state, &job.expected_checksum).await {
Ok(Some(upload_url)) => {
let expected = if passed {
upload_url.clone()
} else {
guess_expected_url
};
(upload_url, expected, None)
}
Ok(None) => (snapshot.png_html_path.clone(), guess_expected_url, None),
Err(e) => (
snapshot.png_html_path.clone(),
guess_expected_url,
Some(format!("{e:#}")),
),
};
let mut ci_result = CiReplayResult {
trace_path: snapshot.trace_path,
expected_checksum: job.expected_checksum,
actual_checksum: snapshot.actual_checksum,
actual_image_url,
expected_image_url,
diff_fraction: 1.0,
actual_preview: snapshot.actual_preview,
expected_preview: String::new(),
diff_image: String::new(),
diff_image_preview: String::new(),
log_path: snapshot.log_path,
execution_time: snapshot.execution_time,
log_text: snapshot.log_text,
flaky_frames: snapshot.flaky_frames,
upload_error,
peak: snapshot.peak,
};
fill_diff_info(
&mut ci_result,
snapshot.png_image,
&state.output_dir,
&state.cache,
)
.await;
Ok(ci_result)
}
fn write_checksums_update_json(
path: &Path,
device: &str,
results: &[Result<CiReplayResult>],
) -> Result<()> {
#[derive(serde::Serialize)]
struct ChecksumChange<'a> {
device: &'a str,
trace: &'a str,
old_checksum: &'a str,
new_checksum: &'a str,
}
let changes: Vec<ChecksumChange> = results
.iter()
.filter_map(|r| r.as_ref().ok())
.filter(|r| r.actual_checksum != r.expected_checksum)
.map(|r| ChecksumChange {
device,
trace: &r.trace_path,
old_checksum: &r.expected_checksum,
new_checksum: &r.actual_checksum,
})
.collect();
if changes.is_empty() {
return Ok(());
}
let json =
serde_json::to_string_pretty(&changes).context("serializing checksum-changes.json")?;
std::fs::write(path, json.as_bytes()).context("writing checksum-changes.json: {e:#}")
}
pub fn replay(replay: &Replay) -> Result<()> {
let config = traces_config::TracesConfig::load(&replay.config)?;
let to_run: Vec<_> = config
.traces
.iter()
.filter_map(|trace| {
let device = trace.device(&replay.device)?;
if device.skip {
return None;
}
Some((trace, device))
})
.collect();
let cache_dir = if let Some(dir) = &replay.cache_dir {
dir.clone()
} else {
let cache_home = std::env::var_os("XDG_CACHE_HOME")
.map(|x| Path::new(&x).to_path_buf())
.or_else(|| std::env::var_os("HOME").map(|x| Path::new(&x).join(".cache")))
.context("Couldn't find $XDG_CACHE_HOME or $HOME for cache dir, set one manually.")?;
cache_home.join("gpu-trace-perf")
};
let cache = std::sync::Arc::new(trace_downloader::TraceDownloader::new(
cache_dir,
replay.disk_limit,
replay.jwt.as_deref(),
replay.traces_db.clone(),
)?);
if to_run.is_empty() {
bail!("No traces to run for device {}", replay.device);
}
let output_dir = &replay.output;
std::fs::create_dir_all(output_dir).context("creating output directory")?;
let output_dir_str = output_dir
.to_str()
.context("output dir path is not valid UTF-8")?
.to_string();
let traces_s3_url = config.traces_db.download_url.clone();
let download_url = match &replay.download_caching_proxy {
Some(proxy) => format!("{proxy}{traces_s3_url}"),
None => traces_s3_url,
};
let jobs: Vec<TraceJob> = to_run
.into_iter()
.skip(replay.fraction_start - 1)
.step_by(replay.fraction)
.map(|(trace, device)| {
let mut replay_args = trace.replay_args.clone();
replay_args.extend(device.replay_args.iter().cloned());
TraceJob {
path: trace.path.clone(),
expected_checksum: device.checksum.clone(),
singlethread: device.singlethread,
nonloopable: trace.nonloopable,
replay_args,
}
})
.collect();
let monitor = SystemMonitor::new();
let activity = monitor.counters.clone();
let trace_events = monitor.events.clone();
let monitor_start_instant = std::time::Instant::now();
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("creating async replay runtime")?;
let results: Vec<Result<CiReplayResult>> = rt.block_on(async {
let replay_jobs = if replay.jobs > 0 {
replay.jobs
} else {
num_cpus::get()
};
let state = ReplayState {
cache,
dl_sem: std::sync::Arc::new(tokio::sync::Semaphore::new(4)),
replay_sem: std::sync::Arc::new(tokio::sync::Semaphore::new(replay_jobs)),
replay_jobs,
download_url,
snapshots_url: replay.snapshots_url.clone(),
job_url: replay.job_url.clone(),
snapshot_url_must_work: replay.snapshot_url_must_work,
no_upload: replay.no_upload,
output_dir: output_dir_str,
timeout: Duration::from_secs(replay.timeout_secs),
activity,
monitor_start: monitor_start_instant,
trace_events,
};
let mut join_set: tokio::task::JoinSet<Result<CiReplayResult>> =
tokio::task::JoinSet::new();
for job in jobs {
join_set.spawn(replay_one_trace(state.clone(), job));
}
let mut results: Vec<Result<CiReplayResult>> = Vec::new();
while let Some(r) = join_set.join_next().await {
results.push(r.context("trace task failed")?);
}
Ok::<_, anyhow::Error>(results)
})?;
let mut any_failure = false;
for result in &results {
match result {
Ok(r) => {
if !r.flaky_frames.is_empty() {
warn!(
"FLAKY {}: frames with inconsistent rendering: {}",
r.trace_path,
r.flaky_frames.join(", ")
);
}
if let Some(ref e) = r.upload_error {
error!("FAIL {}: upload failed: {e}", r.trace_path);
any_failure = true;
} else if r.actual_checksum == r.expected_checksum {
info!("PASS {}", r.trace_path);
} else {
error!(
"FAIL {} expected:{} actual:{}",
r.trace_path, r.expected_checksum, r.actual_checksum
);
any_failure = true;
}
}
Err(e) => {
error!("ERROR: {e:#}");
any_failure = true;
}
}
}
let failing: Vec<(String, String)> = results
.iter()
.filter_map(|r| r.as_ref().ok())
.filter(|r| r.actual_checksum != r.expected_checksum && !r.actual_checksum.is_empty())
.map(|r| (r.trace_path.clone(), r.actual_checksum.clone()))
.collect();
let patch_written = if !failing.is_empty() {
match generate_checksums_patch(
&mut File::open(&replay.config).context("opening config toml for patching")?,
replay
.config
.file_name()
.context("getting filename of config toml")?
.to_str()
.context("Getting UTF8 filename of config toml")?,
&replay.device,
&failing,
) {
Ok(patch) if !patch.is_empty() => {
let patch_path = output_dir.join("checksums.patch");
std::fs::write(&patch_path, patch.as_bytes())
.with_context(|| format!("writing {}", patch_path.display()))?;
true
}
Ok(_) => {
error!("Failed to generate checksums.patch, got empty patch");
false
}
Err(e) => {
error!("Failed to generate checksums.patch: {e:#}");
false
}
}
} else {
false
};
write_checksums_update_json(
&output_dir.join("checksum-changes.json"),
&replay.device,
&results,
)
.unwrap_or_else(|e| error!("writing checksums json: {e}"));
let (monitor_data, trace_event_list) = monitor.stop();
match build_graphs_html(&monitor_data, &trace_event_list) {
Ok(graphs_html) => {
let graphs_path = output_dir.join("graphs.html");
if let Err(e) = std::fs::write(&graphs_path, graphs_html.as_bytes()) {
warn!("Failed to write graphs.html: {e}");
}
}
Err(e) => warn!("Failed to build graphs.html: {e}"),
}
let ok_results: Vec<&CiReplayResult> = results.iter().filter_map(|r| r.as_ref().ok()).collect();
let patch_url = if patch_written {
"checksums.patch".to_string()
} else {
String::new()
};
let html = build_ci_replay_html(
&ok_results,
&replay.device,
patch_url,
"graphs.html".to_string(),
)
.context("rendering CI replay HTML")?;
let index_path = output_dir.join("index.html");
std::fs::write(&index_path, html.as_bytes())
.with_context(|| format!("writing {}", index_path.display()))?;
if any_failure {
anyhow::bail!("Snapshot replay failed");
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
pub fn diff_inline_tables() {
let config = r#"
[traces-db]
download-url = "http://unused/"
[[traces]]
path = "valve/half-life-2-v2.trace"
devices = {
radv = {
checksum = "8f5929c82e7d990e8c3d2bea14688224aabbccdd8f5929c82e7d990e8c3d2bea",
},
}
"#;
let diff = generate_checksums_patch(
&mut Cursor::new(config),
"configfile.toml",
"radv",
&[(
"valve/half-life-2-v2.trace".to_string(),
"d3bf13302b87783558bf330f2a2347e91962c8d2abca22772cce98096f8c646e".to_string(),
)],
)
.unwrap();
insta::assert_snapshot!(diff);
}
#[test]
pub fn diff_tables() {
let config = r#"
[traces_db]
download-url = "http://unused/"
[[traces]]
path = "glmark2-pulsar.trace"
[traces.devices.test-device]
checksum = "0000000000000000000000000000000000000000000000000000000000000000"
"#;
let diff = generate_checksums_patch(
&mut Cursor::new(config),
"cfg.toml",
"test-device",
&[(
"glmark2-pulsar.trace".to_string(),
"1111111111111111111111111111111111111111111111111111111111111111".to_string(),
)],
)
.unwrap();
insta::assert_snapshot!(diff);
}
}