use core_affinity::CoreId;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::oneshot;
use super::{renderer, runtime};
#[derive(Debug, Clone)]
pub struct V8PoolConfig {
pub num_threads: usize,
pub queue_capacity: usize,
pub pin_threads: bool,
pub request_timeout: Option<Duration>,
pub render_function: String,
}
impl Default for V8PoolConfig {
fn default() -> Self {
Self {
num_threads: num_cpus::get(),
queue_capacity: 512,
pin_threads: false,
request_timeout: Some(Duration::from_secs(30)),
render_function: "renderPage".to_string(),
}
}
}
struct RenderRequest {
url: String,
data: String,
render_function: String,
response_tx: oneshot::Sender<Result<String, String>>,
}
#[derive(Debug, Clone)]
pub enum PoolError {
Timeout,
Disconnected,
WorkerCrashed,
Render(String),
}
impl std::fmt::Display for PoolError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PoolError::Timeout => write!(f, "Timed out waiting for a free V8 worker"),
PoolError::Disconnected => write!(f, "V8 pool is not accepting requests"),
PoolError::WorkerCrashed => write!(f, "V8 worker crashed while rendering"),
PoolError::Render(msg) => write!(f, "{}", msg),
}
}
}
impl std::error::Error for PoolError {}
pub struct V8Pool {
config: V8PoolConfig,
request_tx: mpsc::SyncSender<RenderRequest>,
#[allow(dead_code)]
request_rx: Arc<Mutex<mpsc::Receiver<RenderRequest>>>,
worker_count: Arc<Mutex<usize>>,
#[allow(dead_code)]
core_affinity: Option<Arc<Vec<CoreId>>>,
#[allow(dead_code)]
next_core: Arc<AtomicUsize>,
}
impl V8Pool {
pub fn new(config: V8PoolConfig) -> Self {
tracing::info!("🔧 Creating V8 pool with {} threads", config.num_threads);
let (request_tx, request_rx) = mpsc::sync_channel(config.queue_capacity);
let request_rx = Arc::new(Mutex::new(request_rx));
let worker_count = Arc::new(Mutex::new(0));
let core_affinity = if config.pin_threads {
core_affinity::get_core_ids().map(Arc::new)
} else {
None
};
let pool = Self {
config: config.clone(),
request_tx,
request_rx: Arc::clone(&request_rx),
worker_count: Arc::clone(&worker_count),
core_affinity: core_affinity.clone(),
next_core: Arc::new(AtomicUsize::new(0)),
};
for i in 0..config.num_threads {
spawn_worker(
i,
Arc::clone(&request_rx),
Arc::clone(&worker_count),
core_affinity.clone(),
Arc::clone(&pool.next_core),
);
}
tracing::info!("✅ Started {} V8 workers", config.num_threads);
pool
}
pub async fn render(&self, url: String) -> Result<String, PoolError> {
self.render_with_data(url, "{}".to_string()).await
}
pub async fn render_with_data(&self, url: String, data: String) -> Result<String, PoolError> {
let (response_tx, response_rx) = oneshot::channel();
let request = RenderRequest {
url,
data,
render_function: self.config.render_function.clone(),
response_tx,
};
let deadline = self.config.request_timeout.map(|t| Instant::now() + t);
let mut req = request;
loop {
match self.request_tx.try_send(req) {
Ok(()) => break,
Err(mpsc::TrySendError::Full(r)) => {
if let Some(dl) = deadline {
if Instant::now() >= dl {
return Err(PoolError::Timeout);
}
}
req = r;
tokio::task::yield_now().await;
continue;
}
Err(mpsc::TrySendError::Disconnected(_)) => {
return Err(PoolError::Disconnected);
}
}
}
match response_rx.await {
Ok(Ok(html)) => Ok(html),
Ok(Err(msg)) => Err(PoolError::Render(msg)),
Err(_) => Err(PoolError::WorkerCrashed),
}
}
pub fn worker_count(&self) -> usize {
*self.worker_count.lock().unwrap()
}
pub fn config(&self) -> &V8PoolConfig {
&self.config
}
}
impl Drop for V8Pool {
fn drop(&mut self) {
tracing::info!("🛑 Shutting down V8 pool");
}
}
fn spawn_worker(
id: usize,
request_rx: Arc<Mutex<mpsc::Receiver<RenderRequest>>>,
worker_count: Arc<Mutex<usize>>,
core_affinity: Option<Arc<Vec<CoreId>>>,
next_core: Arc<AtomicUsize>,
) {
{
let mut count = worker_count.lock().unwrap();
*count += 1;
}
thread::spawn(move || {
tracing::debug!("🟢 V8 worker {} started", id);
if let Some(cores) = core_affinity {
let idx = next_core.fetch_add(1, Ordering::Relaxed) % cores.len();
if let Some(core_id) = cores.get(idx) {
if core_affinity::set_for_current(*core_id) {
tracing::debug!("📌 Worker {} pinned to core {:?}", id, core_id.id);
}
}
}
if let Err(e) = runtime::init_runtime() {
tracing::error!("❌ Failed to initialize V8 for worker {}: {}", id, e);
let mut count = worker_count.lock().unwrap();
*count -= 1;
return;
}
let mut requests_processed = 0usize;
loop {
let request = {
let rx = request_rx.lock().unwrap();
match rx.recv() {
Ok(req) => Some(req),
Err(_) => {
tracing::debug!("🔴 Worker {} channel disconnected", id);
break;
}
}
};
if let Some(req) = request {
prefetch_data(&req.data);
let result = runtime::with_runtime(|js_runtime| {
renderer::render_html(
&req.url,
Some(&req.data),
&req.render_function,
js_runtime,
)
});
let _ = req.response_tx.send(result);
requests_processed += 1;
}
}
tracing::debug!(
"🔴 Worker {} stopped (processed {} requests)",
id,
requests_processed
);
let mut count = worker_count.lock().unwrap();
*count -= 1;
});
}
#[inline]
fn prefetch_data(data: &str) {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
{
unsafe {
use core::arch::x86_64::{_mm_prefetch, _MM_HINT_T0};
_mm_prefetch(data.as_ptr() as *const i8, _MM_HINT_T0);
}
}
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
{
let _ = data.len();
}
}
impl V8Pool {
#[allow(dead_code)]
pub fn new_stub_with(config: V8PoolConfig) -> Self {
let (request_tx, request_rx) = mpsc::sync_channel(config.queue_capacity);
Self {
config,
request_tx,
request_rx: Arc::new(Mutex::new(request_rx)),
worker_count: Arc::new(Mutex::new(0)),
core_affinity: None,
next_core: Arc::new(AtomicUsize::new(0)),
}
}
#[allow(dead_code)]
pub fn new_stub() -> Self {
Self::new_stub_with(V8PoolConfig {
num_threads: 0,
queue_capacity: 0,
pin_threads: false,
request_timeout: Some(Duration::from_millis(10)),
render_function: "renderPage".to_string(),
})
}
}