#![cfg(feature = "send-values")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tishlang_bytecode::compile;
use tishlang_core::{NativeFn, Value};
use tishlang_vm::Vm;
type Handler = NativeFn;
fn export(vm: &Vm, name: &str) -> Handler {
match vm.get_global(name).unwrap_or_else(|| panic!("global `{name}` not found")) {
Value::Function(f) => f,
other => panic!("global `{name}` is not a function: {other:?}"),
}
}
fn read_num(obj: &Value, field: &str) -> f64 {
match obj {
Value::Object(o) => match o.borrow().strings.get(field) {
Some(Value::Number(n)) => *n,
other => panic!("stats.{field} is not a number: {other:?}"),
},
other => panic!("stats is not an object: {other:?}"),
}
}
#[test]
fn concurrent_handlers_mutating_shared_module_state_do_not_deadlock() {
let src = r#"
let active = 0
let maxActive = 0
let served = 0
fn handleRequest(req) {
active = active + 1
served = served + 1
if (active > maxActive) { maxActive = active }
let i = 0
while (i < 2000) { i = i + 1 } // brief CPU hold so handlers overlap
active = active - 1
return { status: 200, body: "ok" }
}
fn getStats() {
return { active: active, maxActive: maxActive, served: served }
}
handler = handleRequest
stats = getStats
"#;
let program = tishlang_parser::parse(src).expect("parse");
let chunk = compile(&program).expect("compile");
let mut vm = Vm::new();
vm.run(&chunk).expect("run top-level");
let handler = export(&vm, "handler");
let stats = export(&vm, "stats");
const THREADS: usize = 12;
const ITERS: usize = 100;
let total = THREADS * ITERS;
let done = Arc::new(AtomicUsize::new(0));
let start = Instant::now();
let mut handles = Vec::with_capacity(THREADS);
for t in 0..THREADS {
let h = handler.clone();
let done = Arc::clone(&done);
handles.push(std::thread::spawn(move || {
for i in 0..ITERS {
let resp = h.call(&[Value::Number((t * ITERS + i) as f64)]);
assert!(matches!(resp, Value::Object(_)), "handler must return a response object");
done.fetch_add(1, Ordering::Relaxed);
}
}));
}
let mut last = 0usize;
let mut last_change = Instant::now();
while done.load(Ordering::Relaxed) < total {
let cur = done.load(Ordering::Relaxed);
if cur != last {
last = cur;
last_change = Instant::now();
}
assert!(
last_change.elapsed() < Duration::from_secs(15),
"DEADLOCK regression: concurrent handlers stalled at {cur}/{total} (no progress for 15s)"
);
std::thread::sleep(Duration::from_millis(20));
}
for h in handles {
h.join().expect("a handler thread panicked");
}
let s = stats.call(&[]);
let served = read_num(&s, "served");
let max_active = read_num(&s, "maxActive");
let active = read_num(&s, "active");
eprintln!(
"completed {total} concurrent calls / {THREADS} threads in {:?}; served={served}, maxActive={max_active}, active(final)={active}",
start.elapsed()
);
assert!(served > 0.0 && served <= total as f64, "served={served} out of plausible range (0, {total}]");
assert!(max_active >= 2.0, "handlers never overlapped (maxActive={max_active}); test did not exercise concurrency");
}