use std::{
sync::{
atomic::{AtomicUsize, Ordering},
mpsc,
},
thread,
time::{Duration, Instant},
};
use crate::{page::Page, stealth::StealthProfile};
pub struct NavigateResult {
pub html: String,
pub elapsed: Duration,
pub error: Option<String>,
}
struct Job {
url: String,
profile: Option<StealthProfile>,
result_tx: tokio::sync::oneshot::Sender<NavigateResult>,
}
struct WorkerHandle {
tx: mpsc::Sender<Job>,
_thread: thread::JoinHandle<()>,
}
pub struct ParallelPager {
workers: Vec<WorkerHandle>,
next_worker: AtomicUsize,
}
impl ParallelPager {
pub fn new(num_workers: usize) -> Self {
assert!(num_workers > 0, "ParallelPager needs at least 1 worker");
let workers = (0..num_workers)
.map(|i| {
let (tx, rx) = mpsc::channel::<Job>();
let thread = thread::Builder::new()
.name(format!("hpx-browser-pager-{i}"))
.stack_size(64 * 1024 * 1024)
.spawn(move || worker_main(rx))
.unwrap_or_else(|e| panic!("failed to spawn pager worker: {e}"));
WorkerHandle {
tx,
_thread: thread,
}
})
.collect();
Self {
workers,
next_worker: AtomicUsize::new(0),
}
}
pub async fn navigate(
&self,
url: impl Into<String>,
profile: Option<StealthProfile>,
) -> NavigateResult {
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let job = Job {
url: url.into(),
profile,
result_tx,
};
let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.workers.len();
match self.workers[idx].tx.send(job) {
Ok(_) => {}
Err(send_err) => {
return NavigateResult {
html: String::new(),
elapsed: Duration::default(),
error: Some(format!(
"worker {idx} unavailable (likely panicked): {send_err}"
)),
};
}
}
match result_rx.await {
Ok(r) => r,
Err(_) => NavigateResult {
html: String::new(),
elapsed: Duration::default(),
error: Some("worker dropped result sender (panic during navigate)".to_string()),
},
}
}
pub fn num_workers(&self) -> usize {
self.workers.len()
}
}
impl Drop for ParallelPager {
fn drop(&mut self) {
self.workers.clear();
}
}
fn worker_main(rx: mpsc::Receiver<Job>) {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
tracing::error!("[pager-worker] failed to build tokio runtime: {e}");
return;
}
};
while let Ok(job) = rx.recv() {
let begin = Instant::now();
let url = job.url.clone();
let profile = job.profile;
let result: NavigateResult = rt.block_on(async move {
match Page::from_html("<html><head></head><body></body></html>", profile).await {
Ok(mut page) => match page.navigate(&url).await {
Ok(()) => NavigateResult {
html: page.content(),
elapsed: begin.elapsed(),
error: None,
},
Err(e) => NavigateResult {
html: String::new(),
elapsed: begin.elapsed(),
error: Some(format!("{e}")),
},
},
Err(e) => NavigateResult {
html: String::new(),
elapsed: begin.elapsed(),
error: Some(format!("{e}")),
},
}
});
let _ = job.result_tx.send(result);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn parallel_pager_spawns_and_drops_cleanly() {
let pager = ParallelPager::new(2);
assert_eq!(pager.num_workers(), 2);
drop(pager);
}
#[tokio::test]
async fn parallel_navigate_returns_result() {
let pager = ParallelPager::new(1);
let result = pager.navigate("about:blank", None).await;
assert!(result.elapsed.as_nanos() > 0, "job should have run");
drop(pager);
}
}