use chrono::{TimeZone, Utc};
use grex_core::fs::ManifestLock;
use grex_core::manifest::{append_event, read_all, Event, SCHEMA_VERSION};
use std::fs::OpenOptions;
use std::io::Write;
use std::panic::{self, AssertUnwindSafe};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
use tempfile::tempdir;
fn mk_event(tag: &str) -> Event {
Event::Add {
ts: Utc::now(),
id: tag.into(),
url: "u".into(),
path: tag.into(),
pack_type: "declarative".into(),
schema_version: SCHEMA_VERSION.into(),
}
}
fn mk_event_at(tag: &str, secs: i64) -> Event {
Event::Add {
ts: Utc.timestamp_opt(secs, 0).unwrap(),
id: tag.into(),
url: "u".into(),
path: tag.into(),
pack_type: "declarative".into(),
schema_version: SCHEMA_VERSION.into(),
}
}
struct Paths {
manifest: PathBuf,
lock: PathBuf,
}
fn paths(dir: &Path) -> Paths {
Paths { manifest: dir.join("grex.jsonl"), lock: dir.join(".grex.lock") }
}
#[test]
fn four_threads_append_under_lock() {
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
let manifest = Arc::new(manifest);
let handles: Vec<_> = (0..4)
.map(|tid| {
let m = Arc::clone(&manifest);
let lp = lock.clone();
thread::spawn(move || {
let mut l = ManifestLock::open(&m, &lp).unwrap();
for i in 0..25 {
let ev = mk_event(&format!("t{tid}-p{i}"));
l.write(|| append_event(&m, &ev)).unwrap().unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let events = read_all(&manifest).unwrap();
assert_eq!(events.len(), 4 * 25);
let mut ids: Vec<_> = events.iter().map(|e| e.id().clone()).collect();
ids.sort();
let total = ids.len();
ids.dedup();
assert_eq!(ids.len(), total, "no duplicate or torn events");
}
#[test]
fn writer_panic_midwrite_releases_lock() {
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
{
let mut l = ManifestLock::open(&manifest, &lock).unwrap();
l.write(|| append_event(&manifest, &mk_event("pre"))).unwrap().unwrap();
}
let mp = manifest.clone();
let lp = lock.clone();
let a = thread::spawn(move || {
let mut l = ManifestLock::open(&mp, &lp).unwrap();
let _ = panic::catch_unwind(AssertUnwindSafe(|| {
l.write(|| {
let mut f = OpenOptions::new().append(true).open(&mp).unwrap();
f.write_all(b"{\"op\":\"add\",\"ts\":\"2026-04").unwrap();
panic!("simulated mid-write crash");
})
.unwrap();
}));
});
a.join().unwrap();
let b = thread::spawn({
let mp = manifest.clone();
let lp = lock.clone();
move || {
let mut l = ManifestLock::open(&mp, &lp).unwrap();
l.write(|| append_event(&mp, &mk_event("post"))).unwrap().unwrap();
}
});
b.join().unwrap();
let events = read_all(&manifest).unwrap();
let ids: Vec<_> = events.iter().map(|e| e.id().as_str().to_owned()).collect();
assert!(ids.contains(&"pre".to_owned()), "prior event must survive panic");
let mut l = ManifestLock::open(&manifest, &lock).unwrap();
l.write(|| ()).unwrap();
}
const CHILD_ENV: &str = "GREX_CONCURRENT_CHILD";
const CHILD_EVENTS: usize = 20;
#[test]
fn cross_process_concurrent_append() {
if let Ok(spec) = std::env::var(CHILD_ENV) {
run_child_append(&spec);
}
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
let exe = std::env::current_exe().expect("current_exe");
let spawn_child = |tag: &str| {
std::process::Command::new(&exe)
.arg("--exact")
.arg("cross_process_concurrent_append")
.arg("--nocapture")
.env(CHILD_ENV, format!("{tag}|{}|{}", manifest.display(), lock.display()))
.spawn()
.expect("spawn child")
};
let mut c1 = spawn_child("A");
let mut c2 = spawn_child("B");
let s1 = c1.wait().expect("wait c1");
let s2 = c2.wait().expect("wait c2");
assert!(s1.success(), "child 1 failed: {s1:?}");
assert!(s2.success(), "child 2 failed: {s2:?}");
let events = read_all(&manifest).unwrap();
assert_eq!(events.len(), 2 * CHILD_EVENTS, "combined event count");
let mut ids: Vec<_> = events.iter().map(|e| e.id().clone()).collect();
ids.sort();
let total = ids.len();
ids.dedup();
assert_eq!(ids.len(), total, "ids unique across processes");
}
fn run_child_append(spec: &str) -> ! {
let mut it = spec.split('|');
let tag = it.next().expect("tag");
let manifest = PathBuf::from(it.next().expect("manifest"));
let lock = PathBuf::from(it.next().expect("lock"));
let mut l = ManifestLock::open(&manifest, &lock).expect("child: open lock");
for i in 0..CHILD_EVENTS {
let ev = mk_event(&format!("proc{tag}-{i}"));
l.write(|| append_event(&manifest, &ev))
.expect("child: write lock")
.expect("child: append");
}
std::process::exit(0);
}
#[test]
fn partial_line_append_then_concurrent_preserves_prior() {
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
{
let mut l = ManifestLock::open(&manifest, &lock).unwrap();
l.write(|| append_event(&manifest, &mk_event("prior"))).unwrap().unwrap();
}
{
let mut f = OpenOptions::new().append(true).open(&manifest).unwrap();
f.write_all(b"{\"op\":\"add\",\"ts\":\"2026").unwrap();
}
let manifest_a = Arc::new(manifest.clone());
let barrier = Arc::new(Barrier::new(3));
let handles: Vec<_> = (0..3)
.map(|tid| {
let m = Arc::clone(&manifest_a);
let lp = lock.clone();
let b = Arc::clone(&barrier);
thread::spawn(move || {
let mut l = ManifestLock::open(&m, &lp).unwrap();
b.wait();
l.write(|| append_event(&m, &mk_event(&format!("post-{tid}")))).unwrap().unwrap();
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let events = read_all(&manifest).expect("heal-on-append: read must succeed");
let mut ids: Vec<_> = events.iter().map(|e| e.id().as_str().to_owned()).collect();
ids.sort();
assert!(ids.contains(&"prior".to_owned()), "prior event survives heal");
let posts: Vec<_> = ids.iter().filter(|s| s.starts_with("post-")).collect();
assert_eq!(posts.len(), 3, "all three concurrent appends landed cleanly: {ids:?}");
}
#[test]
fn high_thread_count_stress() {
const THREADS: usize = 64;
const PER_THREAD: usize = 10;
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
let manifest = Arc::new(manifest);
let start = std::time::Instant::now();
let handles: Vec<_> = (0..THREADS)
.map(|tid| {
let m = Arc::clone(&manifest);
let lp = lock.clone();
thread::spawn(move || {
let mut l = ManifestLock::open(&m, &lp).unwrap();
for i in 0..PER_THREAD {
let ev = mk_event(&format!("stress-{tid}-{i}"));
l.write(|| append_event(&m, &ev)).unwrap().unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let elapsed = start.elapsed();
let events = read_all(&manifest).unwrap();
assert_eq!(events.len(), THREADS * PER_THREAD, "exact count");
let mut ids: Vec<_> = events.iter().map(|e| e.id().clone()).collect();
ids.sort();
let total = ids.len();
ids.dedup();
assert_eq!(ids.len(), total, "no duplicates under heavy contention");
assert!(elapsed < Duration::from_secs(30), "stress run took too long: {elapsed:?}");
}
#[test]
fn reader_blocks_during_active_write() {
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
let writer_started = Arc::new(Barrier::new(2));
let writer_hold = Arc::new(std::sync::Mutex::new(()));
let writer_hold_taken = Arc::clone(&writer_hold);
let mp = manifest.clone();
let lp = lock.clone();
let ws = Arc::clone(&writer_started);
let a = thread::spawn(move || {
let mut l = ManifestLock::open(&mp, &lp).unwrap();
let _held = writer_hold_taken.lock().unwrap();
l.write(|| {
ws.wait();
append_event(&mp, &mk_event("during-write")).unwrap();
thread::sleep(Duration::from_millis(200));
})
.unwrap();
});
writer_started.wait();
let t_before_read = std::time::Instant::now();
let lp2 = lock.clone();
let mp2 = manifest.clone();
let b = thread::spawn(move || {
let mut l = ManifestLock::open(&mp2, &lp2).unwrap();
l.read(|| read_all(&mp2).unwrap()).unwrap()
});
let events = b.join().unwrap();
let waited = t_before_read.elapsed();
a.join().unwrap();
assert!(
waited >= Duration::from_millis(50),
"reader did not appear to block: waited {waited:?}"
);
let ids: Vec<_> = events.iter().map(|e| e.id().as_str().to_owned()).collect();
assert!(ids.contains(&"during-write".to_owned()));
}
#[cfg(unix)]
#[test]
fn file_deleted_midlock_defined_behavior_unix() {
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
let mut l = ManifestLock::open(&manifest, &lock).unwrap();
l.write(|| append_event(&manifest, &mk_event("pre"))).unwrap().unwrap();
std::fs::remove_file(&manifest).unwrap();
assert!(!manifest.exists());
l.write(|| append_event(&manifest, &mk_event("after-unlink"))).unwrap().unwrap();
let events = read_all(&manifest).unwrap();
let ids: Vec<_> = events.iter().map(|e| e.id().as_str().to_owned()).collect();
assert_eq!(ids, vec!["after-unlink".to_owned()]);
}
#[cfg(windows)]
#[test]
fn file_deleted_midlock_defined_behavior_windows() {
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
let mut l = ManifestLock::open(&manifest, &lock).unwrap();
l.write(|| append_event(&manifest, &mk_event("pre"))).unwrap().unwrap();
let _ = std::fs::remove_file(&manifest);
l.write(|| append_event(&manifest, &mk_event("after-delete"))).unwrap().unwrap();
let events = read_all(&manifest).unwrap();
assert!(!events.is_empty(), "append after delete should yield events");
}
#[cfg(windows)]
#[test]
fn windows_advisory_vs_mandatory_lock() {
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
let mp = manifest.clone();
let lp = lock.clone();
let started = Arc::new(Barrier::new(2));
let finish = Arc::new(Barrier::new(2));
let s1 = Arc::clone(&started);
let f1 = Arc::clone(&finish);
let holder = thread::spawn(move || {
let mut l = ManifestLock::open(&mp, &lp).unwrap();
l.write(|| {
s1.wait();
f1.wait();
})
.unwrap();
});
started.wait();
let mut f = OpenOptions::new().create(true).append(true).open(&manifest).unwrap();
let bypass_ok = f.write_all(b"sneaky\n").is_ok();
drop(f);
finish.wait();
holder.join().unwrap();
assert!(bypass_ok, "advisory-only gap expected: bypass write should currently succeed");
let mut l = ManifestLock::open(&manifest, &lock).unwrap();
l.write(|| ()).unwrap();
}
#[test]
fn lock_drop_on_unwind_releases() {
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
let mp = manifest.clone();
let lp = lock.clone();
let a = thread::spawn(move || {
let mut l = ManifestLock::open(&mp, &lp).unwrap();
let _ = panic::catch_unwind(AssertUnwindSafe(|| {
l.write(|| panic!("inside-write-closure")).unwrap();
}));
});
a.join().unwrap();
let mp2 = manifest.clone();
let lp2 = lock.clone();
let acquired = Arc::new(std::sync::atomic::AtomicBool::new(false));
let acq2 = Arc::clone(&acquired);
let b = thread::spawn(move || {
let mut l = ManifestLock::open(&mp2, &lp2).unwrap();
l.write(|| acq2.store(true, std::sync::atomic::Ordering::SeqCst)).unwrap();
});
b.join().unwrap();
assert!(
acquired.load(std::sync::atomic::Ordering::SeqCst),
"next thread acquired lock after panic"
);
}
#[test]
fn timestamp_collision_not_corrupting() {
let dir = tempdir().unwrap();
let Paths { manifest, lock } = paths(dir.path());
let mut l = ManifestLock::open(&manifest, &lock).unwrap();
let tags = ["a", "b", "c", "d"];
for tag in tags {
let ev = mk_event_at(tag, 1_700_000_000);
l.write(|| append_event(&manifest, &ev)).unwrap().unwrap();
}
let events = read_all(&manifest).unwrap();
assert_eq!(events.len(), tags.len(), "no merge/drop on equal ts");
let ids: Vec<_> = events.iter().map(|e| e.id().as_str().to_owned()).collect();
assert_eq!(
ids,
tags.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
"append order preserved under identical ts"
);
}