1use std::{
13 sync::{
14 atomic::{AtomicUsize, Ordering},
15 mpsc,
16 },
17 thread,
18 time::{Duration, Instant},
19};
20
21use crate::{page::Page, stealth::StealthProfile};
22
23pub struct NavigateResult {
25 pub html: String,
27 pub elapsed: Duration,
29 pub error: Option<String>,
31}
32
33struct Job {
35 url: String,
36 profile: Option<StealthProfile>,
37 result_tx: tokio::sync::oneshot::Sender<NavigateResult>,
38}
39
40struct WorkerHandle {
41 tx: mpsc::Sender<Job>,
42 _thread: thread::JoinHandle<()>,
43}
44
45pub struct ParallelPager {
48 workers: Vec<WorkerHandle>,
49 next_worker: AtomicUsize,
50}
51
52impl ParallelPager {
53 pub fn new(num_workers: usize) -> Self {
55 assert!(num_workers > 0, "ParallelPager needs at least 1 worker");
56 let workers = (0..num_workers)
57 .map(|i| {
58 let (tx, rx) = mpsc::channel::<Job>();
59 let thread = thread::Builder::new()
60 .name(format!("hpx-browser-pager-{i}"))
61 .stack_size(64 * 1024 * 1024)
62 .spawn(move || worker_main(rx))
63 .unwrap_or_else(|e| panic!("failed to spawn pager worker: {e}"));
64 WorkerHandle {
65 tx,
66 _thread: thread,
67 }
68 })
69 .collect();
70 Self {
71 workers,
72 next_worker: AtomicUsize::new(0),
73 }
74 }
75
76 pub async fn navigate(
78 &self,
79 url: impl Into<String>,
80 profile: Option<StealthProfile>,
81 ) -> NavigateResult {
82 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
83 let job = Job {
84 url: url.into(),
85 profile,
86 result_tx,
87 };
88
89 let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.workers.len();
90 match self.workers[idx].tx.send(job) {
91 Ok(_) => {}
92 Err(send_err) => {
93 return NavigateResult {
94 html: String::new(),
95 elapsed: Duration::default(),
96 error: Some(format!(
97 "worker {idx} unavailable (likely panicked): {send_err}"
98 )),
99 };
100 }
101 }
102 match result_rx.await {
103 Ok(r) => r,
104 Err(_) => NavigateResult {
105 html: String::new(),
106 elapsed: Duration::default(),
107 error: Some("worker dropped result sender (panic during navigate)".to_string()),
108 },
109 }
110 }
111
112 pub fn num_workers(&self) -> usize {
114 self.workers.len()
115 }
116}
117
118impl Drop for ParallelPager {
119 fn drop(&mut self) {
120 self.workers.clear();
121 }
122}
123
124fn worker_main(rx: mpsc::Receiver<Job>) {
125 let rt = match tokio::runtime::Builder::new_current_thread()
126 .enable_all()
127 .build()
128 {
129 Ok(rt) => rt,
130 Err(e) => {
131 tracing::error!("[pager-worker] failed to build tokio runtime: {e}");
132 return;
133 }
134 };
135
136 while let Ok(job) = rx.recv() {
137 let begin = Instant::now();
138 let url = job.url.clone();
139 let profile = job.profile;
140
141 let result: NavigateResult = rt.block_on(async move {
142 match Page::from_html("<html><head></head><body></body></html>", profile).await {
143 Ok(mut page) => match page.navigate(&url).await {
144 Ok(()) => NavigateResult {
145 html: page.content(),
146 elapsed: begin.elapsed(),
147 error: None,
148 },
149 Err(e) => NavigateResult {
150 html: String::new(),
151 elapsed: begin.elapsed(),
152 error: Some(format!("{e}")),
153 },
154 },
155 Err(e) => NavigateResult {
156 html: String::new(),
157 elapsed: begin.elapsed(),
158 error: Some(format!("{e}")),
159 },
160 }
161 });
162
163 let _ = job.result_tx.send(result);
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[tokio::test]
172 async fn parallel_pager_spawns_and_drops_cleanly() {
173 let pager = ParallelPager::new(2);
174 assert_eq!(pager.num_workers(), 2);
175 drop(pager);
176 }
177
178 #[tokio::test]
179 async fn parallel_navigate_returns_result() {
180 let pager = ParallelPager::new(1);
181 let result = pager.navigate("about:blank", None).await;
183 assert!(result.elapsed.as_nanos() > 0, "job should have run");
184 drop(pager);
185 }
186}