mod common;
use std::fs;
use std::path::Path;
use std::process::Command;
fn has_toolchain(version: &str) -> bool {
Command::new("rustup")
.args(["run", version, "rustc", "--version"])
.output()
.is_ok_and(|o| o.status.success())
}
fn create_rayon_alloc_project(dir: &Path) {
fs::create_dir_all(dir.join("src")).unwrap();
fs::write(
dir.join("rust-toolchain.toml"),
r#"[toolchain]
channel = "1.91.0"
"#,
)
.unwrap();
fs::write(
dir.join("Cargo.toml"),
r#"[package]
name = "threaded_alloc_test"
version = "0.1.0"
edition = "2021"
[dependencies]
rayon = "1"
[[bin]]
name = "threaded_alloc_test"
path = "src/main.rs"
"#,
)
.unwrap();
fs::write(
dir.join("src").join("main.rs"),
r#"fn main() {
let cpus = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(1);
let threads = cpus.min(4);
rayon::ThreadPoolBuilder::new()
.num_threads(threads)
.build_global()
.ok();
rayon::scope(|s| {
for i in 0..threads {
s.spawn(move |_| {
worker(i);
});
}
});
println!("ok");
}
fn worker(id: usize) {
let mut vecs: Vec<Vec<u8>> = Vec::new();
for j in 0..100 {
vecs.push(vec![0u8; (id + 1) * (j + 1)]);
}
std::hint::black_box(&vecs);
}
"#,
)
.unwrap();
}
fn create_cross_thread_project(dir: &Path) {
fs::create_dir_all(dir.join("src")).unwrap();
fs::write(
dir.join("Cargo.toml"),
r#"[package]
name = "cross-thread-fixture"
version = "0.1.0"
edition = "2024"
[dependencies]
rayon = "1"
"#,
)
.unwrap();
fs::write(
dir.join("src").join("main.rs"),
r#"use rayon::prelude::*;
fn main() {
let items: Vec<u64> = (0..100).collect();
// Pattern 1: rayon par_iter with instrumented function calls
let results: Vec<u64> = items.par_iter().map(|&x| compute(x)).collect();
println!("par_iter results: {}", results.len());
// Pattern 2: std::thread::scope with instrumented work
std::thread::scope(|s| {
for chunk in items.chunks(25) {
s.spawn(move || {
for &x in chunk {
compute(x);
}
});
}
});
println!("thread::scope done");
}
fn compute(x: u64) -> u64 {
let mut result = x;
for _ in 0..1000 {
result = result.wrapping_mul(31).wrapping_add(7);
}
result
}
"#,
)
.unwrap();
}
#[test]
fn rayon_program_with_alloc_tracking_does_not_crash_on_older_rust() {
if !has_toolchain("1.91.0") {
eprintln!("skipping: Rust 1.91.0 not installed (rustup toolchain install 1.91.0)");
return;
}
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().join("threaded_alloc_test");
create_rayon_alloc_project(&project_dir);
let piano_bin = env!("CARGO_BIN_EXE_piano");
let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR"));
let runtime_path = manifest_dir.join("piano-runtime");
let output = Command::new(piano_bin)
.args(["build", "--fn", "worker", "--fn", "main", "--project"])
.arg(&project_dir)
.arg("--runtime-path")
.arg(&runtime_path)
.output()
.expect("failed to run piano build");
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(
output.status.success(),
"piano build failed:\nstderr: {stderr}\nstdout: {stdout}"
);
let binary_path = stdout.trim();
assert!(
Path::new(binary_path).exists(),
"built binary should exist at: {binary_path}"
);
let runs_dir = tmp.path().join("runs");
fs::create_dir_all(&runs_dir).unwrap();
let run_output = Command::new(binary_path)
.env("PIANO_RUNS_DIR", &runs_dir)
.output()
.expect("failed to run instrumented binary");
let run_stderr = String::from_utf8_lossy(&run_output.stderr);
let run_stdout = String::from_utf8_lossy(&run_output.stdout);
assert!(
run_output.status.success(),
"instrumented binary crashed (likely TLS destructor abort):\nstderr: {run_stderr}\nstdout: {run_stdout}"
);
assert!(
run_stdout.contains("ok"),
"program should produce correct output, got: {run_stdout}"
);
let run_file = common::largest_ndjson_file(&runs_dir);
let content = fs::read_to_string(&run_file).unwrap();
assert!(
content.contains("worker"),
"output should contain worker function data"
);
}
#[test]
fn cross_thread_captures_all_calls() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().join("cross-thread-fixture");
create_cross_thread_project(&project_dir);
common::prepopulate_deps(&project_dir, common::rayon_seed());
let piano_bin = env!("CARGO_BIN_EXE_piano");
let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR"));
let runtime_path = manifest_dir.join("piano-runtime");
let piano_build = Command::new(piano_bin)
.args(["build", "--fn", "compute", "--project"])
.arg(&project_dir)
.arg("--runtime-path")
.arg(&runtime_path)
.output()
.expect("failed to run piano build");
let stderr = String::from_utf8_lossy(&piano_build.stderr);
let stdout = String::from_utf8_lossy(&piano_build.stdout);
assert!(
piano_build.status.success(),
"piano build failed:\nstderr: {stderr}\nstdout: {stdout}"
);
let binary_path = stdout.trim();
let runs_dir = tmp.path().join("runs");
fs::create_dir_all(&runs_dir).unwrap();
let run = Command::new(binary_path)
.env("PIANO_RUNS_DIR", &runs_dir)
.output()
.expect("failed to run instrumented binary");
assert!(
run.status.success(),
"instrumented binary failed:\n{}",
String::from_utf8_lossy(&run.stderr)
);
let run_file = common::largest_ndjson_file(&runs_dir);
let content = fs::read_to_string(&run_file).unwrap();
assert!(
content.contains("\"compute\""),
"should contain compute function. Got:\n{content}"
);
let stats = common::aggregate_ndjson(&content);
let compute_calls = stats.get("compute").map(|s| s.calls).unwrap_or(0);
assert_eq!(
compute_calls, 200,
"compute should be called 200 times (100 par_iter + 100 thread::scope), got {compute_calls}"
);
assert!(
!stats.contains_key("main"),
"main should NOT appear in stats (lifecycle boundary, excluded from name table)"
);
}