1use std::{
13 sync::{
14 atomic::{AtomicUsize, Ordering},
15 mpsc,
16 },
17 thread,
18 time::{Duration, Instant},
19};
20
21use crate::{page::Page, stealth::StealthProfile};
22
23pub struct NavigateResult {
25 pub html: String,
27 pub elapsed: Duration,
29 pub error: Option<String>,
31}
32
33struct Job {
35 url: String,
36 profile: Option<StealthProfile>,
37 result_tx: tokio::sync::oneshot::Sender<NavigateResult>,
38}
39
40struct WorkerHandle {
41 tx: mpsc::Sender<Job>,
42 _thread: thread::JoinHandle<()>,
43}
44
45pub struct ParallelPager {
48 workers: Vec<WorkerHandle>,
49 next_worker: AtomicUsize,
50}
51
52impl ParallelPager {
53 pub fn new(num_workers: usize) -> Self {
55 assert!(num_workers > 0, "ParallelPager needs at least 1 worker");
56 let workers = (0..num_workers)
57 .map(|i| {
58 let (tx, rx) = mpsc::channel::<Job>();
59 let thread = thread::Builder::new()
60 .name(format!("hpx-browser-pager-{i}"))
61 .stack_size(64 * 1024 * 1024)
62 .spawn(move || worker_main(rx))
63 .unwrap_or_else(|e| panic!("failed to spawn pager worker: {e}"));
64 WorkerHandle {
65 tx,
66 _thread: thread,
67 }
68 })
69 .collect();
70 Self {
71 workers,
72 next_worker: AtomicUsize::new(0),
73 }
74 }
75
76 pub async fn navigate(
78 &self,
79 url: impl Into<String>,
80 profile: Option<StealthProfile>,
81 ) -> NavigateResult {
82 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
83 let job = Job {
84 url: url.into(),
85 profile,
86 result_tx,
87 };
88
89 let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.workers.len();
90 match self.workers[idx].tx.send(job) {
91 Ok(_) => {}
92 Err(send_err) => {
93 return NavigateResult {
94 html: String::new(),
95 elapsed: Duration::default(),
96 error: Some(format!(
97 "worker {idx} unavailable (likely panicked): {send_err}"
98 )),
99 };
100 }
101 }
102 match result_rx.await {
103 Ok(r) => r,
104 Err(_) => NavigateResult {
105 html: String::new(),
106 elapsed: Duration::default(),
107 error: Some("worker dropped result sender (panic during navigate)".to_string()),
108 },
109 }
110 }
111
112 pub fn num_workers(&self) -> usize {
114 self.workers.len()
115 }
116}
117
118impl Drop for ParallelPager {
119 fn drop(&mut self) {
120 self.workers.clear();
121 }
122}
123
124fn worker_main(rx: mpsc::Receiver<Job>) {
125 let rt = match tokio::runtime::Builder::new_current_thread()
126 .enable_all()
127 .build()
128 {
129 Ok(rt) => rt,
130 Err(e) => {
131 tracing::error!("[pager-worker] failed to build tokio runtime: {e}");
132 return;
133 }
134 };
135
136 while let Ok(job) = rx.recv() {
137 let begin = Instant::now();
138 let url = job.url.clone();
139 let profile = job.profile;
140
141 let result: NavigateResult = rt.block_on(async move {
142 match Page::from_html("<html><head></head><body></body></html>", profile.is_some())
143 .await
144 {
145 Ok(mut page) => match page.navigate(&url).await {
146 Ok(()) => NavigateResult {
147 html: page.content(),
148 elapsed: begin.elapsed(),
149 error: None,
150 },
151 Err(e) => NavigateResult {
152 html: String::new(),
153 elapsed: begin.elapsed(),
154 error: Some(format!("{e}")),
155 },
156 },
157 Err(e) => NavigateResult {
158 html: String::new(),
159 elapsed: begin.elapsed(),
160 error: Some(format!("{e}")),
161 },
162 }
163 });
164
165 let _ = job.result_tx.send(result);
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[tokio::test]
174 async fn parallel_pager_spawns_and_drops_cleanly() {
175 let pager = ParallelPager::new(2);
176 assert_eq!(pager.num_workers(), 2);
177 drop(pager);
178 }
179
180 #[tokio::test]
181 async fn parallel_navigate_returns_result() {
182 let pager = ParallelPager::new(1);
183 let result = pager.navigate("about:blank", None).await;
185 assert!(result.elapsed.as_nanos() > 0, "job should have run");
186 drop(pager);
187 }
188}