Skip to main content

hpx_browser/
parallel.rs

1//! Parallel page navigator — N OS-thread worker pool.
2//!
3//! `Page` (and its embedded V8 isolate) is `!Send`. This module spawns
4//! `N` dedicated OS threads, each with its own tokio current-thread
5//! runtime and its own `Page` instances. Jobs are dispatched from the
6//! caller over `std::sync::mpsc` channels. Results come back over
7//! `tokio::sync::oneshot` so the caller can `.await` them naturally.
8//!
9//! Round-robin scheduling — caller-visible API is just
10//! `pager.navigate(url, profile).await`.
11
12use std::{
13    sync::{
14        atomic::{AtomicUsize, Ordering},
15        mpsc,
16    },
17    thread,
18    time::{Duration, Instant},
19};
20
21use crate::{page::Page, stealth::StealthProfile};
22
23/// Result of a single navigation.
24pub struct NavigateResult {
25    /// Final body HTML, or empty on error.
26    pub html: String,
27    /// Wall-clock time from job dispatch through navigation completion.
28    pub elapsed: Duration,
29    /// Some(message) if navigate returned Err. None on success.
30    pub error: Option<String>,
31}
32
33/// Internal job message.
34struct Job {
35    url: String,
36    profile: Option<StealthProfile>,
37    result_tx: tokio::sync::oneshot::Sender<NavigateResult>,
38}
39
40struct WorkerHandle {
41    tx: mpsc::Sender<Job>,
42    _thread: thread::JoinHandle<()>,
43}
44
45/// N-worker parallel navigation pool. Each worker thread owns its own
46/// tokio runtime + its own `Page` instances.
47pub struct ParallelPager {
48    workers: Vec<WorkerHandle>,
49    next_worker: AtomicUsize,
50}
51
52impl ParallelPager {
53    /// Spawn `num_workers` OS threads.
54    pub fn new(num_workers: usize) -> Self {
55        assert!(num_workers > 0, "ParallelPager needs at least 1 worker");
56        let workers = (0..num_workers)
57            .map(|i| {
58                let (tx, rx) = mpsc::channel::<Job>();
59                let thread = thread::Builder::new()
60                    .name(format!("hpx-browser-pager-{i}"))
61                    .stack_size(64 * 1024 * 1024)
62                    .spawn(move || worker_main(rx))
63                    .unwrap_or_else(|e| panic!("failed to spawn pager worker: {e}"));
64                WorkerHandle {
65                    tx,
66                    _thread: thread,
67                }
68            })
69            .collect();
70        Self {
71            workers,
72            next_worker: AtomicUsize::new(0),
73        }
74    }
75
76    /// Dispatch a navigation to the next worker (round-robin).
77    pub async fn navigate(
78        &self,
79        url: impl Into<String>,
80        profile: Option<StealthProfile>,
81    ) -> NavigateResult {
82        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
83        let job = Job {
84            url: url.into(),
85            profile,
86            result_tx,
87        };
88
89        let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.workers.len();
90        match self.workers[idx].tx.send(job) {
91            Ok(_) => {}
92            Err(send_err) => {
93                return NavigateResult {
94                    html: String::new(),
95                    elapsed: Duration::default(),
96                    error: Some(format!(
97                        "worker {idx} unavailable (likely panicked): {send_err}"
98                    )),
99                };
100            }
101        }
102        match result_rx.await {
103            Ok(r) => r,
104            Err(_) => NavigateResult {
105                html: String::new(),
106                elapsed: Duration::default(),
107                error: Some("worker dropped result sender (panic during navigate)".to_string()),
108            },
109        }
110    }
111
112    /// Number of workers spawned at construction time.
113    pub fn num_workers(&self) -> usize {
114        self.workers.len()
115    }
116}
117
118impl Drop for ParallelPager {
119    fn drop(&mut self) {
120        self.workers.clear();
121    }
122}
123
124fn worker_main(rx: mpsc::Receiver<Job>) {
125    let rt = match tokio::runtime::Builder::new_current_thread()
126        .enable_all()
127        .build()
128    {
129        Ok(rt) => rt,
130        Err(e) => {
131            tracing::error!("[pager-worker] failed to build tokio runtime: {e}");
132            return;
133        }
134    };
135
136    while let Ok(job) = rx.recv() {
137        let begin = Instant::now();
138        let url = job.url.clone();
139        let profile = job.profile;
140
141        let result: NavigateResult = rt.block_on(async move {
142            match Page::from_html("<html><head></head><body></body></html>", profile).await {
143                Ok(mut page) => match page.navigate(&url).await {
144                    Ok(()) => NavigateResult {
145                        html: page.content(),
146                        elapsed: begin.elapsed(),
147                        error: None,
148                    },
149                    Err(e) => NavigateResult {
150                        html: String::new(),
151                        elapsed: begin.elapsed(),
152                        error: Some(format!("{e}")),
153                    },
154                },
155                Err(e) => NavigateResult {
156                    html: String::new(),
157                    elapsed: begin.elapsed(),
158                    error: Some(format!("{e}")),
159                },
160            }
161        });
162
163        let _ = job.result_tx.send(result);
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[tokio::test]
172    async fn parallel_pager_spawns_and_drops_cleanly() {
173        let pager = ParallelPager::new(2);
174        assert_eq!(pager.num_workers(), 2);
175        drop(pager);
176    }
177
178    #[tokio::test]
179    async fn parallel_navigate_returns_result() {
180        let pager = ParallelPager::new(1);
181        // about:blank is not an HTTP URI — verify pager dispatches and returns without panicking
182        let result = pager.navigate("about:blank", None).await;
183        assert!(result.elapsed.as_nanos() > 0, "job should have run");
184        drop(pager);
185    }
186}