use anyhow::Result;
use crossbeam_channel::{bounded, Sender};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use tracing::{debug, error, warn};
type WorkItem = Box<dyn FnOnce(&mut crate::SsrIsolate) + Send + 'static>;
pub struct IsolatePool {
senders: Vec<Option<Sender<WorkItem>>>,
threads: Vec<Option<JoinHandle<()>>>,
next: std::sync::atomic::AtomicUsize,
size: usize,
}
impl IsolatePool {
pub fn new(
size: usize,
server_bundle_js: Arc<String>,
project_root: Option<Arc<String>>,
) -> Result<Self> {
let mut senders = Vec::with_capacity(size);
let mut threads = Vec::with_capacity(size);
for i in 0..size {
let (tx, rx) = bounded::<WorkItem>(64);
let bundle_js = server_bundle_js.clone();
let root = project_root.clone();
let handle = thread::Builder::new()
.name(format!("rex-v8-isolate-{i}"))
.stack_size(16 * 1024 * 1024) .spawn(move || {
crate::init_v8();
let mut isolate = match crate::SsrIsolate::new(
&bundle_js,
root.as_deref().map(|s| s.as_str()),
) {
Ok(iso) => iso,
Err(e) => {
error!("Failed to create V8 isolate {i}: {e:#}");
return;
}
};
debug!("V8 isolate {i} ready");
while let Ok(work) = rx.recv() {
work(&mut isolate);
}
debug!("V8 isolate {i} shutting down");
})?;
senders.push(Some(tx));
threads.push(Some(handle));
}
debug!(count = size, "V8 isolate pool created");
Ok(Self {
senders,
threads,
next: std::sync::atomic::AtomicUsize::new(0),
size,
})
}
pub async fn execute<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(&mut crate::SsrIsolate) -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let work: WorkItem = Box::new(move |isolate| {
let result = f(isolate);
let _ = tx.send(result);
});
let idx = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.size;
self.senders[idx]
.as_ref()
.ok_or_else(|| anyhow::anyhow!("V8 isolate pool is shut down"))?
.send(work)
.map_err(|_| anyhow::anyhow!("V8 isolate thread has shut down"))?;
rx.await
.map_err(|_| anyhow::anyhow!("V8 isolate dropped the response"))
}
pub async fn reload_all(&self, new_bundle: Arc<String>) -> Result<()> {
let mut handles = Vec::new();
for i in 0..self.size {
let bundle = new_bundle.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
let work: WorkItem = Box::new(move |isolate| {
let result = isolate.reload(&bundle);
let _ = tx.send(result);
});
self.senders[i]
.as_ref()
.ok_or_else(|| anyhow::anyhow!("V8 isolate pool is shut down"))?
.send(work)
.map_err(|_| anyhow::anyhow!("V8 isolate thread has shut down"))?;
handles.push(rx);
}
for handle in handles {
handle.await??;
}
debug!("All V8 isolates reloaded");
Ok(())
}
pub async fn load_rsc_bundles_all(
&self,
flight_bundle: Arc<String>,
ssr_bundle: Arc<String>,
) -> Result<()> {
let mut handles = Vec::new();
for i in 0..self.size {
let flight = flight_bundle.clone();
let ssr = ssr_bundle.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
let work: WorkItem = Box::new(move |isolate| {
let result = isolate.load_rsc_bundles(&flight, &ssr);
let _ = tx.send(result);
});
self.senders[i]
.as_ref()
.ok_or_else(|| anyhow::anyhow!("V8 isolate pool is shut down"))?
.send(work)
.map_err(|_| anyhow::anyhow!("V8 isolate thread has shut down"))?;
handles.push(rx);
}
for handle in handles {
handle.await??;
}
debug!("RSC bundles loaded into all V8 isolates");
Ok(())
}
}
impl Drop for IsolatePool {
fn drop(&mut self) {
for sender in &mut self.senders {
sender.take();
}
for (i, handle) in self.threads.iter_mut().enumerate() {
if let Some(h) = handle.take() {
if let Err(e) = h.join() {
warn!("V8 isolate thread {i} panicked: {e:?}");
}
}
}
debug!("V8 isolate pool shut down");
}
}