Skip to main content

rusty_ssr/v8_pool/
pool.rs

1//! V8 Thread Pool implementation
2
3use core_affinity::CoreId;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::{mpsc, Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8use std::time::Instant;
9use tokio::sync::oneshot;
10
11use super::{renderer, runtime};
12
13/// Configuration for the V8 thread pool
14#[derive(Debug, Clone)]
15pub struct V8PoolConfig {
16    /// Number of worker threads (default: number of CPUs)
17    pub num_threads: usize,
18
19    /// Size of the task queue
20    pub queue_capacity: usize,
21
22    /// Pin workers to specific CPU cores
23    pub pin_threads: bool,
24
25    /// Timeout for enqueueing render requests (None = block)
26    pub request_timeout: Option<Duration>,
27
28    /// Name of the render function in JS
29    pub render_function: String,
30}
31
32impl Default for V8PoolConfig {
33    fn default() -> Self {
34        Self {
35            num_threads: num_cpus::get(),
36            queue_capacity: 512,
37            pin_threads: false,
38            request_timeout: Some(Duration::from_secs(30)),
39            render_function: "renderPage".to_string(),
40        }
41    }
42}
43
44/// Internal render request
45struct RenderRequest {
46    url: String,
47    data: String,
48    render_function: String,
49    response_tx: oneshot::Sender<Result<String, String>>,
50}
51
52/// Errors returned by the V8 pool
53#[derive(Debug, Clone)]
54pub enum PoolError {
55    /// Timed out waiting to enqueue work
56    Timeout,
57    /// Pool is not accepting new work
58    Disconnected,
59    /// Worker crashed or dropped the response channel
60    WorkerCrashed,
61    /// Rendering failed inside V8
62    Render(String),
63}
64
65impl std::fmt::Display for PoolError {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            PoolError::Timeout => write!(f, "Timed out waiting for a free V8 worker"),
69            PoolError::Disconnected => write!(f, "V8 pool is not accepting requests"),
70            PoolError::WorkerCrashed => write!(f, "V8 worker crashed while rendering"),
71            PoolError::Render(msg) => write!(f, "{}", msg),
72        }
73    }
74}
75
76impl std::error::Error for PoolError {}
77
78/// V8 Thread Pool for parallel SSR rendering
79///
80/// Each worker thread has its own V8 isolate, solving the `!Send + !Sync`
81/// problem of V8 runtimes.
82///
83/// # Example
84/// ```rust,ignore
85/// use rusty_ssr::v8_pool::{V8Pool, V8PoolConfig};
86///
87/// #[tokio::main]
88/// async fn main() {
89///     let pool = V8Pool::new(V8PoolConfig::default());
90///     let html = pool.render("https://example.com/page".to_string()).await;
91/// }
92/// ```
93pub struct V8Pool {
94    config: V8PoolConfig,
95    request_tx: mpsc::SyncSender<RenderRequest>,
96    #[allow(dead_code)]
97    request_rx: Arc<Mutex<mpsc::Receiver<RenderRequest>>>,
98    worker_count: Arc<Mutex<usize>>,
99    #[allow(dead_code)]
100    core_affinity: Option<Arc<Vec<CoreId>>>,
101    #[allow(dead_code)]
102    next_core: Arc<AtomicUsize>,
103}
104
105impl V8Pool {
106    /// Create a new V8 thread pool
107    pub fn new(config: V8PoolConfig) -> Self {
108        tracing::info!("🔧 Creating V8 pool with {} threads", config.num_threads);
109
110        let (request_tx, request_rx) = mpsc::sync_channel(config.queue_capacity);
111        let request_rx = Arc::new(Mutex::new(request_rx));
112        let worker_count = Arc::new(Mutex::new(0));
113
114        let core_affinity = if config.pin_threads {
115            core_affinity::get_core_ids().map(Arc::new)
116        } else {
117            None
118        };
119
120        let pool = Self {
121            config: config.clone(),
122            request_tx,
123            request_rx: Arc::clone(&request_rx),
124            worker_count: Arc::clone(&worker_count),
125            core_affinity: core_affinity.clone(),
126            next_core: Arc::new(AtomicUsize::new(0)),
127        };
128
129        // Spawn worker threads
130        for i in 0..config.num_threads {
131            spawn_worker(
132                i,
133                Arc::clone(&request_rx),
134                Arc::clone(&worker_count),
135                core_affinity.clone(),
136                Arc::clone(&pool.next_core),
137            );
138        }
139
140        tracing::info!("✅ Started {} V8 workers", config.num_threads);
141
142        pool
143    }
144
145    /// Render a URL to HTML
146    pub async fn render(&self, url: String) -> Result<String, PoolError> {
147        self.render_with_data(url, "{}".to_string()).await
148    }
149
150    /// Render a URL to HTML with custom data
151    pub async fn render_with_data(&self, url: String, data: String) -> Result<String, PoolError> {
152        let (response_tx, response_rx) = oneshot::channel();
153
154        let request = RenderRequest {
155            url,
156            data,
157            render_function: self.config.render_function.clone(),
158            response_tx,
159        };
160
161        let deadline = self.config.request_timeout.map(|t| Instant::now() + t);
162        let mut req = request;
163
164        loop {
165            match self.request_tx.try_send(req) {
166                Ok(()) => break,
167                Err(mpsc::TrySendError::Full(r)) => {
168                    if let Some(dl) = deadline {
169                        if Instant::now() >= dl {
170                            return Err(PoolError::Timeout);
171                        }
172                    }
173                    req = r;
174                    tokio::task::yield_now().await;
175                    continue;
176                }
177                Err(mpsc::TrySendError::Disconnected(_)) => {
178                    return Err(PoolError::Disconnected);
179                }
180            }
181        }
182
183        match response_rx.await {
184            Ok(Ok(html)) => Ok(html),
185            Ok(Err(msg)) => Err(PoolError::Render(msg)),
186            Err(_) => Err(PoolError::WorkerCrashed),
187        }
188    }
189
190    /// Get the number of active workers
191    pub fn worker_count(&self) -> usize {
192        *self.worker_count.lock().unwrap()
193    }
194
195    /// Get the pool configuration
196    pub fn config(&self) -> &V8PoolConfig {
197        &self.config
198    }
199}
200
201impl Drop for V8Pool {
202    fn drop(&mut self) {
203        tracing::info!("🛑 Shutting down V8 pool");
204        // Channels will be dropped, workers will receive disconnect and exit
205    }
206}
207
208/// Spawn a worker thread
209fn spawn_worker(
210    id: usize,
211    request_rx: Arc<Mutex<mpsc::Receiver<RenderRequest>>>,
212    worker_count: Arc<Mutex<usize>>,
213    core_affinity: Option<Arc<Vec<CoreId>>>,
214    next_core: Arc<AtomicUsize>,
215) {
216    // Increment worker count
217    {
218        let mut count = worker_count.lock().unwrap();
219        *count += 1;
220    }
221
222    thread::spawn(move || {
223        tracing::debug!("🟢 V8 worker {} started", id);
224
225        // Pin to CPU core if requested
226        if let Some(cores) = core_affinity {
227            let idx = next_core.fetch_add(1, Ordering::Relaxed) % cores.len();
228            if let Some(core_id) = cores.get(idx) {
229                if core_affinity::set_for_current(*core_id) {
230                    tracing::debug!("📌 Worker {} pinned to core {:?}", id, core_id.id);
231                }
232            }
233        }
234
235        // Initialize V8 runtime for this thread
236        if let Err(e) = runtime::init_runtime() {
237            tracing::error!("❌ Failed to initialize V8 for worker {}: {}", id, e);
238            let mut count = worker_count.lock().unwrap();
239            *count -= 1;
240            return;
241        }
242
243        let mut requests_processed = 0usize;
244
245        // Main worker loop
246        loop {
247            let request = {
248                let rx = request_rx.lock().unwrap();
249                match rx.recv() {
250                    Ok(req) => Some(req),
251                    Err(_) => {
252                        tracing::debug!("🔴 Worker {} channel disconnected", id);
253                        break;
254                    }
255                }
256            };
257
258            if let Some(req) = request {
259                // Prefetch data for better cache performance
260                prefetch_data(&req.data);
261
262                // Render via V8
263                let result = runtime::with_runtime(|js_runtime| {
264                    renderer::render_html(
265                        &req.url,
266                        Some(&req.data),
267                        &req.render_function,
268                        js_runtime,
269                    )
270                });
271
272                // Send response
273                let _ = req.response_tx.send(result);
274
275                requests_processed += 1;
276            }
277        }
278
279        tracing::debug!(
280            "🔴 Worker {} stopped (processed {} requests)",
281            id,
282            requests_processed
283        );
284
285        // Decrement worker count
286        let mut count = worker_count.lock().unwrap();
287        *count -= 1;
288    });
289}
290
291/// Prefetch data into CPU cache
292#[inline]
293fn prefetch_data(data: &str) {
294    #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
295    {
296        unsafe {
297            use core::arch::x86_64::{_mm_prefetch, _MM_HINT_T0};
298            _mm_prefetch(data.as_ptr() as *const i8, _MM_HINT_T0);
299        }
300    }
301
302    #[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
303    {
304        // No-op prefetch for other architectures
305        let _ = data.len();
306    }
307}
308
309impl V8Pool {
310    /// Create a stub pool for testing (no actual V8)
311    #[allow(dead_code)]
312    pub fn new_stub_with(config: V8PoolConfig) -> Self {
313        let (request_tx, request_rx) = mpsc::sync_channel(config.queue_capacity);
314        Self {
315            config,
316            request_tx,
317            request_rx: Arc::new(Mutex::new(request_rx)),
318            worker_count: Arc::new(Mutex::new(0)),
319            core_affinity: None,
320            next_core: Arc::new(AtomicUsize::new(0)),
321        }
322    }
323
324    /// Create a stub pool with default test config (no workers)
325    #[allow(dead_code)]
326    pub fn new_stub() -> Self {
327        Self::new_stub_with(V8PoolConfig {
328            num_threads: 0,
329            queue_capacity: 0,
330            pin_threads: false,
331            request_timeout: Some(Duration::from_millis(10)),
332            render_function: "renderPage".to_string(),
333        })
334    }
335}