1use core_affinity::CoreId;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::{mpsc, Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8use std::time::Instant;
9use tokio::sync::oneshot;
10
11use super::{renderer, runtime};
12
13#[derive(Debug, Clone)]
15pub struct V8PoolConfig {
16 pub num_threads: usize,
18
19 pub queue_capacity: usize,
21
22 pub pin_threads: bool,
24
25 pub request_timeout: Option<Duration>,
27
28 pub render_function: String,
30}
31
32impl Default for V8PoolConfig {
33 fn default() -> Self {
34 Self {
35 num_threads: num_cpus::get(),
36 queue_capacity: 512,
37 pin_threads: false,
38 request_timeout: Some(Duration::from_secs(30)),
39 render_function: "renderPage".to_string(),
40 }
41 }
42}
43
44struct RenderRequest {
46 url: String,
47 data: String,
48 render_function: String,
49 response_tx: oneshot::Sender<Result<String, String>>,
50}
51
52#[derive(Debug, Clone)]
54pub enum PoolError {
55 Timeout,
57 Disconnected,
59 WorkerCrashed,
61 Render(String),
63}
64
65impl std::fmt::Display for PoolError {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 PoolError::Timeout => write!(f, "Timed out waiting for a free V8 worker"),
69 PoolError::Disconnected => write!(f, "V8 pool is not accepting requests"),
70 PoolError::WorkerCrashed => write!(f, "V8 worker crashed while rendering"),
71 PoolError::Render(msg) => write!(f, "{}", msg),
72 }
73 }
74}
75
76impl std::error::Error for PoolError {}
77
78pub struct V8Pool {
94 config: V8PoolConfig,
95 request_tx: mpsc::SyncSender<RenderRequest>,
96 #[allow(dead_code)]
97 request_rx: Arc<Mutex<mpsc::Receiver<RenderRequest>>>,
98 worker_count: Arc<Mutex<usize>>,
99 #[allow(dead_code)]
100 core_affinity: Option<Arc<Vec<CoreId>>>,
101 #[allow(dead_code)]
102 next_core: Arc<AtomicUsize>,
103}
104
105impl V8Pool {
106 pub fn new(config: V8PoolConfig) -> Self {
108 tracing::info!("🔧 Creating V8 pool with {} threads", config.num_threads);
109
110 let (request_tx, request_rx) = mpsc::sync_channel(config.queue_capacity);
111 let request_rx = Arc::new(Mutex::new(request_rx));
112 let worker_count = Arc::new(Mutex::new(0));
113
114 let core_affinity = if config.pin_threads {
115 core_affinity::get_core_ids().map(Arc::new)
116 } else {
117 None
118 };
119
120 let pool = Self {
121 config: config.clone(),
122 request_tx,
123 request_rx: Arc::clone(&request_rx),
124 worker_count: Arc::clone(&worker_count),
125 core_affinity: core_affinity.clone(),
126 next_core: Arc::new(AtomicUsize::new(0)),
127 };
128
129 for i in 0..config.num_threads {
131 spawn_worker(
132 i,
133 Arc::clone(&request_rx),
134 Arc::clone(&worker_count),
135 core_affinity.clone(),
136 Arc::clone(&pool.next_core),
137 );
138 }
139
140 tracing::info!("✅ Started {} V8 workers", config.num_threads);
141
142 pool
143 }
144
145 pub async fn render(&self, url: String) -> Result<String, PoolError> {
147 self.render_with_data(url, "{}".to_string()).await
148 }
149
150 pub async fn render_with_data(&self, url: String, data: String) -> Result<String, PoolError> {
152 let (response_tx, response_rx) = oneshot::channel();
153
154 let request = RenderRequest {
155 url,
156 data,
157 render_function: self.config.render_function.clone(),
158 response_tx,
159 };
160
161 let deadline = self.config.request_timeout.map(|t| Instant::now() + t);
162 let mut req = request;
163
164 loop {
165 match self.request_tx.try_send(req) {
166 Ok(()) => break,
167 Err(mpsc::TrySendError::Full(r)) => {
168 if let Some(dl) = deadline {
169 if Instant::now() >= dl {
170 return Err(PoolError::Timeout);
171 }
172 }
173 req = r;
174 tokio::task::yield_now().await;
175 continue;
176 }
177 Err(mpsc::TrySendError::Disconnected(_)) => {
178 return Err(PoolError::Disconnected);
179 }
180 }
181 }
182
183 match response_rx.await {
184 Ok(Ok(html)) => Ok(html),
185 Ok(Err(msg)) => Err(PoolError::Render(msg)),
186 Err(_) => Err(PoolError::WorkerCrashed),
187 }
188 }
189
190 pub fn worker_count(&self) -> usize {
192 *self.worker_count.lock().unwrap()
193 }
194
195 pub fn config(&self) -> &V8PoolConfig {
197 &self.config
198 }
199}
200
201impl Drop for V8Pool {
202 fn drop(&mut self) {
203 tracing::info!("🛑 Shutting down V8 pool");
204 }
206}
207
208fn spawn_worker(
210 id: usize,
211 request_rx: Arc<Mutex<mpsc::Receiver<RenderRequest>>>,
212 worker_count: Arc<Mutex<usize>>,
213 core_affinity: Option<Arc<Vec<CoreId>>>,
214 next_core: Arc<AtomicUsize>,
215) {
216 {
218 let mut count = worker_count.lock().unwrap();
219 *count += 1;
220 }
221
222 thread::spawn(move || {
223 tracing::debug!("🟢 V8 worker {} started", id);
224
225 if let Some(cores) = core_affinity {
227 let idx = next_core.fetch_add(1, Ordering::Relaxed) % cores.len();
228 if let Some(core_id) = cores.get(idx) {
229 if core_affinity::set_for_current(*core_id) {
230 tracing::debug!("📌 Worker {} pinned to core {:?}", id, core_id.id);
231 }
232 }
233 }
234
235 if let Err(e) = runtime::init_runtime() {
237 tracing::error!("❌ Failed to initialize V8 for worker {}: {}", id, e);
238 let mut count = worker_count.lock().unwrap();
239 *count -= 1;
240 return;
241 }
242
243 let mut requests_processed = 0usize;
244
245 loop {
247 let request = {
248 let rx = request_rx.lock().unwrap();
249 match rx.recv() {
250 Ok(req) => Some(req),
251 Err(_) => {
252 tracing::debug!("🔴 Worker {} channel disconnected", id);
253 break;
254 }
255 }
256 };
257
258 if let Some(req) = request {
259 prefetch_data(&req.data);
261
262 let result = runtime::with_runtime(|js_runtime| {
264 renderer::render_html(
265 &req.url,
266 Some(&req.data),
267 &req.render_function,
268 js_runtime,
269 )
270 });
271
272 let _ = req.response_tx.send(result);
274
275 requests_processed += 1;
276 }
277 }
278
279 tracing::debug!(
280 "🔴 Worker {} stopped (processed {} requests)",
281 id,
282 requests_processed
283 );
284
285 let mut count = worker_count.lock().unwrap();
287 *count -= 1;
288 });
289}
290
291#[inline]
293fn prefetch_data(data: &str) {
294 #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
295 {
296 unsafe {
297 use core::arch::x86_64::{_mm_prefetch, _MM_HINT_T0};
298 _mm_prefetch(data.as_ptr() as *const i8, _MM_HINT_T0);
299 }
300 }
301
302 #[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
303 {
304 let _ = data.len();
306 }
307}
308
309impl V8Pool {
310 #[allow(dead_code)]
312 pub fn new_stub_with(config: V8PoolConfig) -> Self {
313 let (request_tx, request_rx) = mpsc::sync_channel(config.queue_capacity);
314 Self {
315 config,
316 request_tx,
317 request_rx: Arc::new(Mutex::new(request_rx)),
318 worker_count: Arc::new(Mutex::new(0)),
319 core_affinity: None,
320 next_core: Arc::new(AtomicUsize::new(0)),
321 }
322 }
323
324 #[allow(dead_code)]
326 pub fn new_stub() -> Self {
327 Self::new_stub_with(V8PoolConfig {
328 num_threads: 0,
329 queue_capacity: 0,
330 pin_threads: false,
331 request_timeout: Some(Duration::from_millis(10)),
332 render_function: "renderPage".to_string(),
333 })
334 }
335}