#![forbid(unsafe_code)]
use std::path::Path;
use std::process::Command;
use std::sync::{Arc, Barrier};
use std::thread;
use proptest::prelude::*;
use tempfile::TempDir;
use vyre_runtime::pipeline_cache::DiskCache;
use vyre_runtime::{
InMemoryPipelineCache, LayeredPipelineCache, PipelineCacheStore, PipelineFingerprint,
};
fn fp_from_seed(seed: u8) -> PipelineFingerprint {
let mut bytes = [seed; 32];
bytes[0] = seed;
bytes[31] = seed.wrapping_mul(7);
PipelineFingerprint(bytes)
}
fn dead_fp() -> PipelineFingerprint {
PipelineFingerprint([0u8; 32])
}
#[test]
fn disk_cache_atomic_rename_race_100_threads() {
let dir = TempDir::new().unwrap();
let cache = Arc::new(DiskCache::new(dir.path()).unwrap());
let fp = fp_from_seed(42);
let payloads: Vec<Vec<u8>> = (0..100).map(|i| vec![i as u8; 64 * 1024]).collect();
let barrier = Arc::new(Barrier::new(101)); let mut handles = vec![];
for payload in payloads.clone() {
let cache: Arc<DiskCache> = Arc::clone(&cache);
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
barrier.wait();
cache.put(fp, payload);
}));
}
let cache_reader: Arc<DiskCache> = Arc::clone(&cache);
let reader = thread::spawn(move || {
barrier.wait();
let mut observations = vec![];
for _ in 0..10_000 {
if let Some(bytes) = cache_reader.get(&fp) {
observations.push(bytes);
}
if observations.len() % 1000 == 0 {
thread::yield_now();
}
}
observations
});
for h in handles {
h.join().expect("writer thread must not panic");
}
let observations = reader.join().expect("reader thread must not panic");
for (idx, obs) in observations.iter().enumerate() {
assert!(
payloads.contains(obs),
"torn read at observation #{idx}: {} bytes do not match any complete payload",
obs.len()
);
}
if let Some(bytes) = cache.get(&fp) {
assert!(
payloads.contains(&bytes),
"final state is {} bytes, not a complete payload",
bytes.len()
);
}
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 256,
failure_persistence: Some(Box::new(
proptest::test_runner::FileFailurePersistence::Off
)),
..ProptestConfig::default()
})]
#[test]
fn corrupted_bin_file_rejected(
seed in 0u8..=255,
partial in prop::collection::vec(0u8..=255, 1..1024)
) {
let dir = TempDir::new().unwrap();
let cache = DiskCache::new(dir.path()).unwrap();
let fp = fp_from_seed(seed);
let path = cache.root().join(format!("{}.bin", fp.hex()));
std::fs::write(&path, &partial).unwrap();
let result = cache.get(&fp);
prop_assert!(
result.is_none(),
"corrupted file returned Some({} bytes) instead of None",
result.map(|v| v.len()).unwrap_or(0)
);
}
}
struct TmpfsGuard {
dir: TempDir,
}
impl TmpfsGuard {
fn with_size_kb(size_kb: usize) -> Result<Self, String> {
let dir = TempDir::new().map_err(|e| e.to_string())?;
let sudo_out = Command::new("sudo")
.args([
"-n",
"mount",
"-t",
"tmpfs",
"-o",
&format!("size={}k", size_kb),
"tmpfs",
])
.arg(dir.path())
.output();
match sudo_out {
Ok(out) if out.status.success() => return Ok(Self { dir }),
_ => {}
}
let direct_out = Command::new("mount")
.args(["-t", "tmpfs", "-o", &format!("size={}k", size_kb), "tmpfs"])
.arg(dir.path())
.output()
.map_err(|e| format!("failed to spawn mount: {}", e))?;
if direct_out.status.success() {
Ok(Self { dir })
} else {
let stderr = String::from_utf8_lossy(&direct_out.stderr);
Err(format!(
"tmpfs mount failed (tried sudo and direct mount): {}",
stderr
))
}
}
fn path(&self) -> &Path {
self.dir.path()
}
}
impl Drop for TmpfsGuard {
fn drop(&mut self) {
let _ = Command::new("sudo")
.args(["-n", "umount", "-l"])
.arg(self.dir.path())
.stderr(std::process::Stdio::null())
.status();
let _ = Command::new("umount")
.args(["-l"])
.arg(self.dir.path())
.stderr(std::process::Stdio::null())
.status();
}
}
#[test]
fn disk_full_tmpfs_put_no_panic() {
let guard = TmpfsGuard::with_size_kb(4).expect(
"this test requires mount privileges to create a 4 KB tmpfs; \
configure passwordless sudo or run in a mount-capable environment",
);
let cache = DiskCache::new(guard.path()).unwrap();
let fp = fp_from_seed(1);
let big_artifact = vec![0xABu8; 100 * 1024];
cache.put(fp, big_artifact);
assert!(
cache.get(&fp).is_none(),
"get() returned Some after a disk-full put - partial artifact must not be served"
);
}
#[cfg(unix)]
#[test]
fn symlink_attack_must_not_read_etc_passwd() {
let dir = TempDir::new().unwrap();
let cache = DiskCache::new(dir.path()).unwrap();
let fp = dead_fp();
let bin_path = cache.root().join(format!("{}.bin", fp.hex()));
let target: std::path::PathBuf = if Path::new("/etc/passwd").exists() {
Path::new("/etc/passwd").into()
} else {
let fallback = dir.path().join("synthetic_secret");
std::fs::write(&fallback, b"SYNTHETIC-SECRET-DATA").unwrap();
fallback
};
std::os::unix::fs::symlink(&target, &bin_path).unwrap();
let result = cache.get(&fp);
assert!(
result.is_none(),
"symlink attack succeeded: get() read {} bytes from {:?} instead of refusing",
result.as_ref().map(|v| v.len()).unwrap_or(0),
target
);
}
#[cfg(not(unix))]
#[test]
fn symlink_attack_not_applicable_on_non_unix() {
assert!(!cfg!(unix), "symlink adversarial suite is Unix-only");
}
#[test]
fn concurrent_put_get_50_50_same_fp_set() {
let dir = TempDir::new().unwrap();
let cache = Arc::new(DiskCache::new(dir.path()).unwrap());
let fps: Vec<PipelineFingerprint> = (0..5).map(fp_from_seed).collect();
let payloads: Vec<Vec<u8>> = (0..5).map(|i| vec![i as u8; 128 * 1024]).collect();
let barrier = Arc::new(Barrier::new(101));
let mut handles = vec![];
for i in 0..50 {
let cache: Arc<DiskCache> = Arc::clone(&cache);
let fps = fps.clone();
let payloads = payloads.clone();
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
barrier.wait();
for _ in 0..20 {
let fp = fps[i % fps.len()];
let payload = payloads[i % payloads.len()].clone();
cache.put(fp, payload);
}
}));
}
for _ in 0..50 {
let cache: Arc<DiskCache> = Arc::clone(&cache);
let fps = fps.clone();
let payloads = payloads.clone();
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
barrier.wait();
for _ in 0..200 {
for fp in &fps {
if let Some(bytes) = cache.get(fp) {
assert!(
payloads.contains(&bytes),
"torn read: {} bytes not in known payload set",
bytes.len()
);
}
}
}
}));
}
barrier.wait();
for h in handles {
h.join().expect("thread must not panic");
}
for fp in &fps {
if let Some(bytes) = cache.get(fp) {
assert!(
payloads.contains(&bytes),
"final state is {} bytes, not a known complete payload",
bytes.len()
);
}
}
}
#[test]
fn layered_cache_disk_over_memory_fallthrough_and_put_routing() {
let dir = TempDir::new().unwrap();
let disk = Arc::new(DiskCache::new(dir.path()).unwrap());
let mem = Arc::new(InMemoryPipelineCache::new());
let fp = fp_from_seed(7);
let artifact = b"layer-1-artifact".to_vec();
mem.put(fp, artifact.clone());
let layered = LayeredPipelineCache::new(vec![disk.clone(), mem.clone()]);
assert_eq!(layered.get(&fp).unwrap(), artifact);
let new_artifact = b"layer-0-artifact".to_vec();
layered.put(fp, new_artifact.clone());
assert_eq!(disk.get(&fp).unwrap(), new_artifact);
assert_eq!(mem.get(&fp).unwrap(), artifact);
}
#[test]
fn compiles_without_remote_feature() {
let _ = InMemoryPipelineCache::new();
}
#[cfg(feature = "remote")]
#[test]
fn remote_cache_in_layered_cache_put_and_get() {
let dir = TempDir::new().unwrap();
let disk = Arc::new(DiskCache::new(dir.path()).unwrap());
let remote = Arc::new(vyre_runtime::RemoteCache::new("http://127.0.0.1:1"));
let fp = fp_from_seed(3);
let layered = LayeredPipelineCache::new(vec![disk.clone(), remote]);
layered.put(fp, b"only-on-disk".to_vec());
assert_eq!(disk.get(&fp).unwrap(), b"only-on-disk".to_vec());
assert_eq!(layered.get(&fp).unwrap(), b"only-on-disk".to_vec());
}