#![cfg(feature = "parallel-handler")]
#![allow(unexpected_cfgs)]
#[path = "support/cdp_mock.rs"]
mod cdp_mock;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use chromiumoxide::handler::HandlerConfig;
use chromiumoxide::Browser;
use chromiumoxide_cdp::cdp::js_protocol::runtime::EvaluateParams;
use tokio::time::timeout;
struct Heartbeat {
stop: Arc<AtomicBool>,
max_gap_ms: Arc<AtomicU64>,
handle: tokio::task::JoinHandle<()>,
}
impl Heartbeat {
fn spawn(tick: Duration) -> Self {
let stop = Arc::new(AtomicBool::new(false));
let max_gap_ms = Arc::new(AtomicU64::new(0));
let stop_clone = stop.clone();
let max_gap_clone = max_gap_ms.clone();
let handle = tokio::spawn(async move {
let mut last = Instant::now();
while !stop_clone.load(Ordering::Relaxed) {
tokio::time::sleep(tick).await;
let now = Instant::now();
let gap = now.duration_since(last).as_millis() as u64;
let prev = max_gap_clone.load(Ordering::Relaxed);
if gap > prev {
max_gap_clone.store(gap, Ordering::Relaxed);
}
last = now;
}
});
Self {
stop,
max_gap_ms,
handle,
}
}
async fn stop(self) -> u64 {
self.stop.store(true, Ordering::Relaxed);
let _ = self.handle.await;
self.max_gap_ms.load(Ordering::Relaxed)
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn parallel_handler_runtime_stays_responsive_under_load() {
let mock = cdp_mock::CdpMock::spawn().await;
let cfg = HandlerConfig {
request_timeout: Duration::from_secs(10),
..Default::default()
};
let (browser, handler) = Browser::connect_with_config(mock.ws_url(), cfg)
.await
.expect("connect to mock");
let _h = tokio::spawn(handler.run_parallel());
let browser = Arc::new(browser);
const TICK_FAST: Duration = Duration::from_millis(10);
const TICK_SLOW: Duration = Duration::from_millis(17);
const MAX_STALL_MS: u64 = 250;
tokio::time::sleep(Duration::from_millis(50)).await;
let hb_fast = Heartbeat::spawn(TICK_FAST);
let hb_slow = Heartbeat::spawn(TICK_SLOW);
const PAGES: usize = 16;
const CMDS_PER_PAGE: usize = 32;
let mut create_tasks = Vec::with_capacity(PAGES);
for _ in 0..PAGES {
let b = browser.clone();
create_tasks.push(tokio::spawn(async move {
timeout(Duration::from_secs(5), b.new_page("about:blank"))
.await
.expect("new_page timeout — possible router/session deadlock")
.expect("new_page")
}));
}
let pages = futures_util::future::join_all(create_tasks)
.await
.into_iter()
.map(|r| r.expect("join"))
.collect::<Vec<_>>();
let mut cmd_tasks = Vec::with_capacity(PAGES * CMDS_PER_PAGE);
for page in &pages {
for i in 0..CMDS_PER_PAGE {
let p = page.clone();
cmd_tasks.push(tokio::spawn(async move {
timeout(
Duration::from_secs(10),
p.execute(EvaluateParams::new(format!("'cmd-{i}'"))),
)
.await
.expect("execute timeout — possible deadlock")
.map(|_| ())
}));
}
}
let results = timeout(
Duration::from_secs(30),
futures_util::future::join_all(cmd_tasks),
)
.await
.expect("workload deadlocked — overall timeout fired");
let mut ok = 0usize;
for r in results {
if r.expect("join").is_ok() {
ok += 1;
}
}
assert_eq!(
ok,
PAGES * CMDS_PER_PAGE,
"all parallel commands across {PAGES} pages should round-trip"
);
let max_fast = hb_fast.stop().await;
let max_slow = hb_slow.stop().await;
let max_gap = max_fast.max(max_slow);
assert!(
max_gap <= MAX_STALL_MS,
"runtime stalled for {max_gap}ms (fast tick {}ms / max {max_fast}ms; \
slow tick {}ms / max {max_slow}ms; limit {MAX_STALL_MS}ms) — \
a SessionTask/Router future is likely holding a worker with a blocking call \
or lock contention",
TICK_FAST.as_millis(),
TICK_SLOW.as_millis(),
);
drop(browser);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn parallel_handler_no_deadlock_under_session_churn() {
let mock = cdp_mock::CdpMock::spawn().await;
let cfg = HandlerConfig {
request_timeout: Duration::from_secs(5),
..Default::default()
};
let (browser, handler) = Browser::connect_with_config(mock.ws_url(), cfg)
.await
.expect("connect");
let _h = tokio::spawn(handler.run_parallel());
let browser = Arc::new(browser);
const ROUNDS: usize = 32;
const CMDS_PER_ROUND: usize = 4;
let work = async {
for round in 0..ROUNDS {
let page = browser
.new_page("about:blank")
.await
.unwrap_or_else(|err| panic!("new_page failed at round {round}: {err}"));
for i in 0..CMDS_PER_ROUND {
page.execute(EvaluateParams::new(format!("'churn-{round}-{i}'")))
.await
.unwrap_or_else(|err| panic!("execute failed at {round}/{i}: {err}"));
}
drop(page);
}
};
timeout(Duration::from_secs(30), work)
.await
.expect("session churn deadlocked — open/use/drop loop did not complete");
drop(browser);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn parallel_handler_no_deadlock_under_concurrent_churn() {
let mock = cdp_mock::CdpMock::spawn().await;
let cfg = HandlerConfig {
request_timeout: Duration::from_secs(5),
..Default::default()
};
let (browser, handler) = Browser::connect_with_config(mock.ws_url(), cfg)
.await
.expect("connect");
let _h = tokio::spawn(handler.run_parallel());
let browser = Arc::new(browser);
const WORKERS: usize = 8;
const ROUNDS_PER_WORKER: usize = 8;
let mut tasks = Vec::with_capacity(WORKERS);
for w in 0..WORKERS {
let b = browser.clone();
tasks.push(tokio::spawn(async move {
for round in 0..ROUNDS_PER_WORKER {
let page = b
.new_page("about:blank")
.await
.unwrap_or_else(|err| panic!("worker {w} new_page round {round}: {err}"));
let _ = page
.execute(EvaluateParams::new(format!("'w{w}-r{round}'")))
.await
.unwrap_or_else(|err| panic!("worker {w} execute round {round}: {err}"));
drop(page);
}
}));
}
let joined = timeout(
Duration::from_secs(30),
futures_util::future::join_all(tasks),
)
.await
.expect("concurrent churn deadlocked — overall timeout fired");
for r in joined {
r.expect("worker task join");
}
drop(browser);
}
#[cfg(tokio_unstable)]
#[test]
fn parallel_handler_no_long_polls_runtime_metrics() {
use tokio::runtime::Builder;
const STALL_THRESHOLD: Duration = Duration::from_millis(50);
let runtime = Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.enable_metrics_poll_time_histogram()
.build()
.expect("build runtime with poll-time histogram");
runtime.block_on(async {
let mock = cdp_mock::CdpMock::spawn().await;
let cfg = HandlerConfig {
request_timeout: Duration::from_secs(10),
..Default::default()
};
let (browser, handler) = Browser::connect_with_config(mock.ws_url(), cfg)
.await
.expect("connect to mock");
let _h = tokio::spawn(handler.run_parallel());
let browser = Arc::new(browser);
const PAGES: usize = 16;
const CMDS_PER_PAGE: usize = 32;
let mut create_tasks = Vec::with_capacity(PAGES);
for _ in 0..PAGES {
let b = browser.clone();
create_tasks.push(tokio::spawn(async move {
timeout(Duration::from_secs(5), b.new_page("about:blank"))
.await
.expect("new_page timeout")
.expect("new_page")
}));
}
let pages = futures_util::future::join_all(create_tasks)
.await
.into_iter()
.map(|r| r.expect("join"))
.collect::<Vec<_>>();
let mut cmd_tasks = Vec::with_capacity(PAGES * CMDS_PER_PAGE);
for page in &pages {
for i in 0..CMDS_PER_PAGE {
let p = page.clone();
cmd_tasks.push(tokio::spawn(async move {
timeout(
Duration::from_secs(10),
p.execute(EvaluateParams::new(format!("'cmd-{i}'"))),
)
.await
.expect("execute timeout")
.map(|_| ())
}));
}
}
let _ = timeout(
Duration::from_secs(30),
futures_util::future::join_all(cmd_tasks),
)
.await
.expect("workload deadlocked");
drop(browser);
tokio::time::sleep(Duration::from_millis(50)).await;
});
let metrics = runtime.metrics();
let num_workers = metrics.num_workers();
let num_buckets = metrics.poll_time_histogram_num_buckets();
assert!(num_workers > 0 && num_buckets > 0);
let mut over_threshold: u64 = 0;
let mut worst_lower_us: u128 = 0;
for w in 0..num_workers {
for b in 0..num_buckets {
let count = metrics.poll_time_histogram_bucket_count(w, b);
if count == 0 {
continue;
}
let range = metrics.poll_time_histogram_bucket_range(b);
if range.start >= STALL_THRESHOLD {
over_threshold += count;
let lo = range.start.as_micros();
if lo > worst_lower_us {
worst_lower_us = lo;
}
}
}
}
assert_eq!(
over_threshold, 0,
"poll-time histogram: {over_threshold} polls took ≥ {}ms (worst bucket lower bound: {}µs) — \
a future is doing blocking work without yielding",
STALL_THRESHOLD.as_millis(),
worst_lower_us,
);
}