#![forbid(unsafe_code)]
use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use obj::{Config, Db, Document, Id};
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
const N_READERS: usize = 8;
const DEFAULT_DURATION_SECS: u64 = 60;
const HEARTBEAT_OPS_GRANULARITY: u64 = 100;
const HEARTBEAT_STALL_SECS: u64 = 30;
const DEFAULT_SEED: u64 = 0xCA7C_AFE0_5717_0220;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct StressDoc {
id_echo: u64,
version: u32,
payload: Vec<u8>,
}
impl Document for StressDoc {
const COLLECTION: &'static str = "stress";
const VERSION: u32 = 1;
}
#[derive(Debug, Clone, Copy)]
enum Op {
Insert,
Update,
Delete,
}
type Heartbeats = Arc<[AtomicU64]>;
#[ignore = "M6 exit gate: long-running concurrent stress test; run via --ignored"]
#[test]
fn concurrent_stress() {
let duration_secs = env::var("OBJ_STRESS_DURATION_SECS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(DEFAULT_DURATION_SECS);
let seed = env::var("OBJ_STRESS_SEED")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(DEFAULT_SEED);
eprintln!("SEED={seed} OBJ_STRESS_DURATION_SECS={duration_secs}");
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("stress.obj");
let config = Config::default()
.cross_process_lock(false)
.busy_timeout(Duration::from_mins(2));
let db = Arc::new(Db::open_with(&path, config).expect("open"));
let outcome = run_stress(&db, seed, Duration::from_secs(duration_secs));
report(&outcome, seed);
}
struct Outcome {
failure: Option<String>,
writer_ops: u64,
reader_ops: u64,
writer_busy: u64,
reader_corruption_soft: u64,
op_log: Vec<String>,
}
fn report(outcome: &Outcome, seed: u64) {
eprintln!(
"stress run complete: writer_ops={} reader_ops={} writer_busy={} \
reader_corruption_soft={} (M6 #53 race counter — should be 0 \
once #53 lands)",
outcome.writer_ops, outcome.reader_ops, outcome.writer_busy, outcome.reader_corruption_soft,
);
if let Some(msg) = outcome.failure.as_ref() {
let log_dir = PathBuf::from("target").join("stress");
let _ = fs::create_dir_all(&log_dir);
let log_path = log_dir.join(format!("seed-{seed}.log"));
if let Ok(mut f) = fs::File::create(&log_path) {
for line in &outcome.op_log {
let _ = writeln!(f, "{line}");
}
}
panic!("SEED={seed} FAIL: {msg}\nop log: {}", log_path.display());
}
}
fn run_stress(db: &Arc<Db>, seed: u64, duration: Duration) -> Outcome {
let heartbeats: Heartbeats = (0..=N_READERS)
.map(|_| AtomicU64::new(0))
.collect::<Vec<_>>()
.into();
let stop = Arc::new(AtomicBool::new(false));
let id_range = Arc::new(AtomicU64::new(0));
let expected = Arc::new(Mutex::new(HashMap::<u64, ExpectedState>::new()));
let op_log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let writer_ops = Arc::new(AtomicU64::new(0));
let writer_busy = Arc::new(AtomicU64::new(0));
let reader_ops = Arc::new(AtomicU64::new(0));
let reader_corruption = Arc::new(AtomicU64::new(0));
let reader_failures: Arc<[Mutex<Option<String>>]> = (0..N_READERS)
.map(|_| Mutex::new(None))
.collect::<Vec<_>>()
.into();
let mut watchdog_msg: Option<String> = None;
thread::scope(|scope| {
spawn_writer(
scope,
db,
seed,
duration,
&heartbeats,
&stop,
&id_range,
&expected,
&op_log,
&writer_ops,
&writer_busy,
);
spawn_readers(
scope,
db,
seed,
duration,
&heartbeats,
&stop,
&id_range,
&reader_failures,
&reader_ops,
&reader_corruption,
);
watchdog_msg = run_watchdog(&heartbeats, &stop, duration);
});
Outcome {
failure: watchdog_msg.or_else(|| collect_reader_failure(&reader_failures)),
writer_ops: writer_ops.load(Ordering::SeqCst),
reader_ops: reader_ops.load(Ordering::SeqCst),
writer_busy: writer_busy.load(Ordering::SeqCst),
reader_corruption_soft: reader_corruption.load(Ordering::SeqCst),
op_log: Arc::try_unwrap(op_log)
.ok()
.and_then(|m| m.into_inner().ok())
.unwrap_or_default(),
}
}
#[derive(Debug, Clone, Copy)]
enum ExpectedState {
Present,
Deleted,
}
#[allow(clippy::too_many_arguments)]
fn spawn_writer<'scope>(
scope: &'scope thread::Scope<'scope, '_>,
db: &Arc<Db>,
seed: u64,
duration: Duration,
heartbeats: &Heartbeats,
stop: &Arc<AtomicBool>,
id_range: &Arc<AtomicU64>,
expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
op_log: &Arc<Mutex<Vec<String>>>,
writer_ops: &Arc<AtomicU64>,
writer_busy: &Arc<AtomicU64>,
) {
let db = Arc::clone(db);
let heartbeats = Heartbeats::clone(heartbeats);
let stop = Arc::clone(stop);
let id_range = Arc::clone(id_range);
let expected = Arc::clone(expected);
let op_log = Arc::clone(op_log);
let writer_ops = Arc::clone(writer_ops);
let writer_busy = Arc::clone(writer_busy);
scope.spawn(move || {
writer_loop(
&db,
seed,
duration,
&heartbeats,
&stop,
&id_range,
&expected,
&op_log,
&writer_ops,
&writer_busy,
);
});
}
#[allow(clippy::too_many_arguments)]
fn spawn_readers<'scope>(
scope: &'scope thread::Scope<'scope, '_>,
db: &Arc<Db>,
seed: u64,
duration: Duration,
heartbeats: &Heartbeats,
stop: &Arc<AtomicBool>,
id_range: &Arc<AtomicU64>,
reader_failures: &Arc<[Mutex<Option<String>>]>,
reader_ops: &Arc<AtomicU64>,
reader_corruption: &Arc<AtomicU64>,
) {
for r in 0..N_READERS {
let db = Arc::clone(db);
let heartbeats = Heartbeats::clone(heartbeats);
let stop = Arc::clone(stop);
let id_range = Arc::clone(id_range);
let reader_failures = Arc::clone(reader_failures);
let reader_ops = Arc::clone(reader_ops);
let reader_corruption = Arc::clone(reader_corruption);
let reader_seed = seed ^ (0x0FEE_D000_u64.wrapping_add(r as u64));
scope.spawn(move || {
let res = reader_loop(
r,
reader_seed,
&db,
duration,
&heartbeats,
&stop,
&id_range,
&reader_ops,
&reader_corruption,
);
if let Err(e) = res {
if let Ok(mut slot) = reader_failures[r].lock() {
*slot = Some(e);
}
stop.store(true, Ordering::SeqCst);
}
});
}
}
fn run_watchdog(
heartbeats: &Heartbeats,
stop: &Arc<AtomicBool>,
duration: Duration,
) -> Option<String> {
let start = Instant::now();
let snapshot =
|hb: &Heartbeats| -> Vec<u64> { hb.iter().map(|a| a.load(Ordering::Relaxed)).collect() };
let mut last_seen = snapshot(heartbeats);
let mut last_change = Instant::now();
while !stop.load(Ordering::Relaxed) && start.elapsed() < duration {
thread::sleep(Duration::from_millis(500));
let current = snapshot(heartbeats);
if current != last_seen {
last_seen = current;
last_change = Instant::now();
} else if last_change.elapsed() > Duration::from_secs(HEARTBEAT_STALL_SECS) {
stop.store(true, Ordering::SeqCst);
return Some(format!(
"deadlock watchdog: no heartbeat advance in {HEARTBEAT_STALL_SECS}s; \
current = {current:?}",
));
}
}
stop.store(true, Ordering::SeqCst);
None
}
fn collect_reader_failure(failures: &[Mutex<Option<String>>]) -> Option<String> {
for (r, slot) in failures.iter().enumerate() {
if let Ok(mut g) = slot.lock() {
if let Some(msg) = g.take() {
return Some(format!("reader {r}: {msg}"));
}
}
}
None
}
#[allow(clippy::too_many_arguments)]
fn writer_loop(
db: &Arc<Db>,
seed: u64,
duration: Duration,
heartbeats: &Heartbeats,
stop: &Arc<AtomicBool>,
id_range: &Arc<AtomicU64>,
expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
op_log: &Arc<Mutex<Vec<String>>>,
writer_ops: &Arc<AtomicU64>,
writer_busy: &Arc<AtomicU64>,
) {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let start = Instant::now();
let mut iter: u64 = 0;
while !stop.load(Ordering::Relaxed) && start.elapsed() < duration {
iter = iter.saturating_add(1);
if iter.is_multiple_of(HEARTBEAT_OPS_GRANULARITY) {
heartbeats[0].store(iter, Ordering::Relaxed);
}
let op = choose_op(&mut rng);
match perform_writer_op(db, op, &mut rng, id_range, expected) {
Ok(()) => {
writer_ops.fetch_add(1, Ordering::Relaxed);
}
Err(obj::Error::Busy { .. }) => {
writer_busy.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
if let Ok(mut log) = op_log.lock() {
log.push(format!("writer iter {iter}: op {op:?} err: {e:?}"));
}
stop.store(true, Ordering::SeqCst);
return;
}
}
}
heartbeats[0].store(iter, Ordering::Relaxed);
}
fn choose_op(rng: &mut ChaCha8Rng) -> Op {
let n: u32 = rng.random_range(0..100);
match n {
0..60 => Op::Insert,
60..85 => Op::Update,
_ => Op::Delete,
}
}
fn perform_writer_op(
db: &Db,
op: Op,
rng: &mut ChaCha8Rng,
id_range: &Arc<AtomicU64>,
expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> obj::Result<()> {
match op {
Op::Insert => writer_insert(db, rng, id_range, expected),
Op::Update => writer_update(db, rng, expected),
Op::Delete => writer_delete(db, rng, expected),
}
}
fn writer_insert(
db: &Db,
rng: &mut ChaCha8Rng,
id_range: &Arc<AtomicU64>,
expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> obj::Result<()> {
let payload = random_payload(rng);
let inserted = db.transaction(|tx| {
let coll = tx.collection::<StressDoc>()?;
let id = coll.insert(StressDoc {
id_echo: 0,
version: 1,
payload,
})?;
coll.update(id, |d: &mut StressDoc| {
d.id_echo = id.get();
})?;
Ok(id)
})?;
if let Ok(mut map) = expected.lock() {
map.insert(inserted.get(), ExpectedState::Present);
}
id_range.fetch_max(inserted.get(), Ordering::SeqCst);
Ok(())
}
fn writer_update(
db: &Db,
rng: &mut ChaCha8Rng,
expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> obj::Result<()> {
let Some((id, new_version)) = pick_existing_id(rng, expected) else {
return Ok(());
};
let payload = random_payload(rng);
let r = db.update::<StressDoc, _>(
Id::try_new(id).expect("nonzero by construction"),
|d: &mut StressDoc| {
d.payload.clone_from(&payload);
d.version = new_version;
},
);
match r {
Ok(()) => {
if let Ok(mut map) = expected.lock() {
map.insert(id, ExpectedState::Present);
}
Ok(())
}
Err(obj::Error::DocumentNotFound { .. }) => Ok(()),
Err(e) => Err(e),
}
}
fn writer_delete(
db: &Db,
rng: &mut ChaCha8Rng,
expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> obj::Result<()> {
let Some((id, _)) = pick_existing_id(rng, expected) else {
return Ok(());
};
let existed = db.delete::<StressDoc>(Id::try_new(id).expect("nonzero"))?;
if existed {
if let Ok(mut map) = expected.lock() {
map.insert(id, ExpectedState::Deleted);
}
}
Ok(())
}
fn random_payload(rng: &mut ChaCha8Rng) -> Vec<u8> {
let len: usize = rng.random_range(8..256);
let mut buf = vec![0u8; len];
rng.fill(buf.as_mut_slice());
buf
}
fn pick_existing_id(
rng: &mut ChaCha8Rng,
expected: &Arc<Mutex<HashMap<u64, ExpectedState>>>,
) -> Option<(u64, u32)> {
let Ok(map) = expected.lock() else {
return None;
};
let present: Vec<u64> = map
.iter()
.filter_map(|(k, v)| matches!(v, ExpectedState::Present).then_some(*k))
.collect();
if present.is_empty() {
return None;
}
let idx = rng.random_range(0..present.len());
let id = present[idx];
let next_version: u32 = rng.random_range(2..u32::MAX);
Some((id, next_version))
}
#[allow(clippy::too_many_arguments)]
fn reader_loop(
r: usize,
seed: u64,
db: &Arc<Db>,
duration: Duration,
heartbeats: &Heartbeats,
stop: &Arc<AtomicBool>,
id_range: &Arc<AtomicU64>,
reader_ops: &Arc<AtomicU64>,
reader_corruption: &Arc<AtomicU64>,
) -> Result<(), String> {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let start = Instant::now();
let mut iter: u64 = 0;
while !stop.load(Ordering::Relaxed) && start.elapsed() < duration {
iter = iter.saturating_add(1);
if iter.is_multiple_of(HEARTBEAT_OPS_GRANULARITY) {
heartbeats[r + 1].store(iter, Ordering::Relaxed);
}
reader_step(db, &mut rng, id_range, reader_ops, reader_corruption)?;
}
heartbeats[r + 1].store(iter, Ordering::Relaxed);
Ok(())
}
fn reader_step(
db: &Db,
rng: &mut ChaCha8Rng,
id_range: &Arc<AtomicU64>,
reader_ops: &Arc<AtomicU64>,
reader_corruption: &Arc<AtomicU64>,
) -> Result<(), String> {
let high = id_range.load(Ordering::SeqCst);
if high == 0 {
thread::yield_now();
return Ok(());
}
let pick = rng.random_range(1..=high);
let Some(id) = Id::try_new(pick) else {
return Ok(());
};
let observed = match reader_get(db, id) {
ReaderGet::Observed(v) => v,
ReaderGet::SoftRetry => return Ok(()),
ReaderGet::SoftCorruption => {
reader_corruption.fetch_add(1, Ordering::Relaxed);
return Ok(());
}
ReaderGet::Hard(msg) => return Err(format!("read_transaction get({pick}): {msg}")),
};
reader_ops.fetch_add(1, Ordering::Relaxed);
check_observed(observed.as_ref(), pick)
}
enum ReaderGet {
Observed(Option<StressDoc>),
SoftRetry,
SoftCorruption,
Hard(String),
}
fn reader_get(db: &Db, id: Id) -> ReaderGet {
let result = db.read_transaction(|tx| {
let coll = match tx.collection::<StressDoc>() {
Ok(c) => c,
Err(obj::Error::CollectionNotFound { .. }) => return Ok(None),
Err(e) => return Err(e),
};
coll.get(id)
});
match result {
Ok(v) => ReaderGet::Observed(v),
Err(obj::Error::Busy { .. }) => ReaderGet::SoftRetry,
Err(obj::Error::Corruption { .. }) => ReaderGet::SoftCorruption,
Err(e) => ReaderGet::Hard(format!("{e:?}")),
}
}
fn check_observed(observed: Option<&StressDoc>, pick: u64) -> Result<(), String> {
let Some(doc) = observed else { return Ok(()) };
if doc.id_echo != pick {
return Err(format!(
"torn read: queried id={pick} observed id_echo={} version={}",
doc.id_echo, doc.version,
));
}
Ok(())
}
const _: () = {
fn assert_send_sync<T: Send + Sync>() {}
let _ = assert_send_sync::<Outcome>;
};