use fsys::{builder, JournalHandle, JournalReader, Lsn};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Barrier};
static C: AtomicU64 = AtomicU64::new(0);
fn tmp_path(tag: &str) -> PathBuf {
let n = C.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"fsys_journal_concurrent_{}_{}_{tag}",
std::process::id(),
n
))
}
struct Cleanup(PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
#[test]
fn concurrent_appends_produce_no_data_corruption() {
const THREADS: usize = 16;
const RECORDS_PER_THREAD: usize = 1000;
const RECORD_SIZE: usize = 64;
let path = tmp_path("no_corruption");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = Arc::new(fs.journal(&path).expect("journal"));
let barrier = Arc::new(Barrier::new(THREADS));
let mut joins = Vec::new();
for tid in 0..THREADS {
let log = log.clone();
let barrier = barrier.clone();
joins.push(std::thread::spawn(move || {
barrier.wait();
let mut record = vec![0u8; RECORD_SIZE];
for i in 0..RECORDS_PER_THREAD {
let tag = format!("T{tid:03}-R{i:06}");
let bytes = tag.as_bytes();
record[..bytes.len()].copy_from_slice(bytes);
let _lsn = log.append(&record).expect("append");
}
}));
}
for j in joins {
j.join().expect("thread join");
}
let final_lsn = log.next_lsn();
log.sync_through(final_lsn).expect("final sync");
let frame_overhead: usize = 12;
let expected_bytes = THREADS * RECORDS_PER_THREAD * (RECORD_SIZE + frame_overhead);
let actual_bytes = std::fs::metadata(&path).expect("stat").len() as usize;
assert_eq!(
actual_bytes, expected_bytes,
"file size mismatch — concurrent appends interleaved or lost data"
);
let mut reader = JournalReader::open(&path).expect("open reader");
let mut tag_counts: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
for record in reader.iter() {
let record = record.expect("record decode");
let trimmed = record
.payload
.iter()
.position(|&b| b == 0)
.map(|p| &record.payload[..p])
.unwrap_or(&record.payload);
let tag = std::str::from_utf8(trimmed).expect("utf-8 tag");
*tag_counts.entry(tag.to_string()).or_insert(0) += 1;
}
assert_eq!(
reader.tail_state(),
fsys::JournalTailState::CleanEnd,
"tail should be clean after reader iteration"
);
assert_eq!(
tag_counts.len(),
THREADS * RECORDS_PER_THREAD,
"expected every distinct tag to appear once; saw {} distinct tags",
tag_counts.len()
);
for (tag, count) in &tag_counts {
assert_eq!(
*count, 1,
"tag {tag} appeared {count} times — duplicate or missing record"
);
}
}
#[test]
fn group_commit_coalesces_concurrent_sync_calls() {
const THREADS: usize = 32;
let path = tmp_path("group_commit");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = Arc::new(fs.journal(&path).expect("journal"));
let mut joins = Vec::new();
for tid in 0..THREADS {
let log = log.clone();
joins.push(std::thread::spawn(move || {
let lsn = log.append(format!("T{tid:03}").as_bytes()).expect("append");
log.sync_through(lsn).expect("sync_through");
assert!(log.synced_lsn() >= lsn);
lsn
}));
}
let mut max_lsn = Lsn::ZERO;
for j in joins {
let lsn = j.join().expect("join");
if lsn > max_lsn {
max_lsn = lsn;
}
}
assert!(log.synced_lsn() >= max_lsn);
}
#[test]
fn close_and_reopen_resumes_correctly() {
let path = tmp_path("reopen");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let initial_size: u64;
{
let log = fs.journal(&path).expect("journal1");
let _ = log.append(b"first record").expect("a1");
let _ = log.append(b"second record").expect("a2");
let lsn = log.append(b"third").expect("a3");
log.sync_through(lsn).expect("sync");
initial_size = log.next_lsn().as_u64();
log.close().expect("close");
}
let log2 = fs.journal(&path).expect("journal2");
assert_eq!(log2.next_lsn().as_u64(), initial_size);
assert_eq!(log2.synced_lsn().as_u64(), initial_size);
let new_lsn = log2.append(b"fourth").expect("a4");
assert!(new_lsn.as_u64() > initial_size);
log2.close().expect("close2");
let mut reader = JournalReader::open(&path).expect("open reader");
let payloads: Vec<Vec<u8>> = reader.iter().map(|r| r.expect("record").payload).collect();
assert_eq!(
payloads,
vec![
b"first record".to_vec(),
b"second record".to_vec(),
b"third".to_vec(),
b"fourth".to_vec(),
]
);
assert_eq!(reader.tail_state(), fsys::JournalTailState::CleanEnd);
}
#[test]
fn synced_lsn_is_monotonic_under_concurrent_load() {
const THREADS: usize = 8;
const ROUNDS: usize = 100;
let path = tmp_path("monotonic");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = Arc::new(fs.journal(&path).expect("journal"));
let observed_max = Arc::new(AtomicU64::new(0));
let mut joins = Vec::new();
for _ in 0..THREADS {
let log = log.clone();
let observed_max = observed_max.clone();
joins.push(std::thread::spawn(move || {
for _ in 0..ROUNDS {
let lsn = log.append(b"x").expect("append");
log.sync_through(lsn).expect("sync");
let now = log.synced_lsn().as_u64();
let prev = observed_max.fetch_max(now, Ordering::SeqCst);
assert!(now >= lsn.as_u64());
let _ = prev;
}
}));
}
for j in joins {
j.join().expect("join");
}
}
#[test]
fn append_returns_lsn_equal_to_file_size_after_each_op() {
let path = tmp_path("size_invariant");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = fs.journal(&path).expect("journal");
for i in 0..50 {
let payload = vec![0xABu8; 17 + i]; let lsn = log.append(&payload).expect("append");
log.sync_through(lsn).expect("sync");
let actual_size = std::fs::metadata(&path).expect("stat").len();
assert_eq!(
actual_size,
lsn.as_u64(),
"after sync_through, file size should equal LSN (= sum of framed record sizes) at iter {i}"
);
}
let _ = JournalHandle::close;
}
#[test]
fn concurrent_appends_thread_count_ladder() {
const RECORDS_PER_THREAD: usize = 250;
const RECORD_SIZE: usize = 32;
for &thread_count in &[1usize, 2, 4, 8, 16, 32] {
let path = tmp_path(&format!("ladder_{thread_count}"));
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = Arc::new(fs.journal(&path).expect("journal"));
let barrier = Arc::new(Barrier::new(thread_count));
let mut handles = Vec::with_capacity(thread_count);
for tid in 0..thread_count {
let log = Arc::clone(&log);
let barrier = Arc::clone(&barrier);
handles.push(std::thread::spawn(move || {
let payload = vec![tid as u8; RECORD_SIZE];
let _ = barrier.wait();
let mut last_lsn = Lsn::ZERO;
for _ in 0..RECORDS_PER_THREAD {
last_lsn = log.append(&payload).expect("append");
}
last_lsn
}));
}
let mut last_lsns: Vec<Lsn> = Vec::with_capacity(thread_count);
for h in handles {
last_lsns.push(h.join().expect("thread join"));
}
let high = last_lsns.iter().copied().max().unwrap_or(Lsn::ZERO);
log.sync_through(high).expect("final sync");
let actual_size = std::fs::metadata(&path).expect("stat").len();
assert_eq!(
actual_size,
high.as_u64(),
"thread_count={thread_count}: file size should equal high LSN"
);
let mut reader = JournalReader::open(&path).expect("reader open");
let mut record_count = 0usize;
let mut per_thread_count = vec![0usize; thread_count];
for rec in reader.iter() {
let r = rec.expect("read record");
assert_eq!(
r.payload.len(),
RECORD_SIZE,
"thread_count={thread_count}: every record should be RECORD_SIZE bytes"
);
let tid = r.payload[0] as usize;
assert!(
tid < thread_count,
"thread_count={thread_count}: record payload byte {tid} >= thread_count {thread_count}"
);
assert!(
r.payload.iter().all(|&b| b == tid as u8),
"thread_count={thread_count}: record payload not uniform (interleaving bug?)"
);
per_thread_count[tid] += 1;
record_count += 1;
}
assert_eq!(
record_count,
thread_count * RECORDS_PER_THREAD,
"thread_count={thread_count}: missing records"
);
for (tid, count) in per_thread_count.iter().enumerate() {
assert_eq!(
*count, RECORDS_PER_THREAD,
"thread_count={thread_count}, tid={tid}: per-thread record count mismatch"
);
}
}
}