use std::{
cell::RefCell,
rc::Rc,
sync::mpsc::{channel, Receiver, Sender},
thread::{spawn, JoinHandle},
};
use crate::{Error, RuntimeOptions};
pub struct WorkerPool<W>
where
W: InnerWorker,
{
workers: Vec<Rc<RefCell<Worker<W>>>>,
next_worker: usize,
options: W::RuntimeOptions,
}
impl<W> WorkerPool<W>
where
W: InnerWorker,
{
pub fn new(options: W::RuntimeOptions, n_workers: u32) -> Result<Self, Error> {
crate::init_platform(n_workers, true);
let mut workers = Vec::with_capacity(n_workers as usize + 1);
for _ in 0..n_workers {
workers.push(Rc::new(RefCell::new(Worker::new(options.clone())?)));
}
Ok(Self {
workers,
next_worker: 0,
options,
})
}
#[must_use]
pub fn options(&self) -> &W::RuntimeOptions {
&self.options
}
pub fn shutdown(self) {
for worker in self.workers {
worker.borrow_mut().shutdown();
}
}
#[must_use]
pub fn len(&self) -> usize {
self.workers.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.workers.is_empty()
}
#[must_use]
pub fn worker_by_id(&self, id: usize) -> Option<Rc<RefCell<Worker<W>>>> {
Some(Rc::clone(self.workers.get(id)?))
}
pub fn next_worker(&mut self) -> Rc<RefCell<Worker<W>>> {
let worker = &self.workers[self.next_worker];
self.next_worker = (self.next_worker + 1) % self.workers.len();
Rc::clone(worker)
}
pub fn send_and_await(&mut self, query: W::Query) -> Result<W::Response, Error> {
self.next_worker().borrow().send_and_await(query)
}
#[must_use = "The returned thread handle will return a Result<T, Error> when joined"]
pub fn eval_in_thread<T>(code: String) -> std::thread::JoinHandle<Result<T, Error>>
where
T: serde::de::DeserializeOwned + Send + 'static,
{
deno_core::JsRuntime::init_platform(None, true);
std::thread::spawn(move || {
let mut runtime = crate::Runtime::new(RuntimeOptions::default())?;
runtime.eval(&code)
})
}
}
pub struct Worker<W>
where
W: InnerWorker,
{
handle: Option<JoinHandle<()>>,
tx: Option<Sender<W::Query>>,
rx: Receiver<W::Response>,
}
impl<W> Worker<W>
where
W: InnerWorker,
{
pub fn new(options: W::RuntimeOptions) -> Result<Self, Error> {
let (qtx, qrx) = channel();
let (rtx, rrx) = channel();
let (init_tx, init_rx) = channel::<Option<Error>>();
let handle = spawn(move || {
let rx = qrx;
let tx = rtx;
let itx = init_tx;
let runtime = match W::init_runtime(options) {
Ok(rt) => rt,
Err(e) => {
itx.send(Some(e)).ok(); return;
}
};
if itx.send(None).is_ok() {
W::thread(runtime, rx, tx);
}
});
let worker = Self {
handle: Some(handle),
tx: Some(qtx),
rx: rrx,
};
match init_rx.recv() {
Ok(None) => Ok(worker),
Ok(Some(e)) => Err(e),
_ => {
let Some(handle) = worker.handle else {
return Err(Error::Runtime(
"Could not start runtime thread: Worker handle missing".to_string(),
));
};
let Err(e) = handle.join() else {
return Err(Error::Runtime("Could not start runtime thread".to_string()));
};
let e = if let Some(e) = e.downcast_ref::<String>() {
e.clone()
} else if let Some(e) = e.downcast_ref::<&str>() {
(*e).to_string()
} else {
"Could not start runtime thread".to_string()
};
let e = match e.split("Stack backtrace").next() {
Some(e) => e.trim(),
None => &e,
}
.to_string();
Err(Error::Runtime(e))
}
}
}
pub fn shutdown(&mut self) {
if let (Some(tx), Some(hnd)) = (self.tx.take(), self.handle.take()) {
drop(tx);
hnd.join().ok();
}
}
pub fn send(&self, query: W::Query) -> Result<(), Error> {
match &self.tx {
None => return Err(Error::WorkerHasStopped),
Some(tx) => tx,
}
.send(query)
.map_err(|e| Error::Runtime(e.to_string()))
}
pub fn receive(&self) -> Result<W::Response, Error> {
self.rx.recv().map_err(|e| Error::Runtime(e.to_string()))
}
pub fn try_receive(&self) -> Result<Option<W::Response>, Error> {
match self.rx.try_recv() {
Ok(v) => Ok(Some(v)),
Err(e) => match e {
std::sync::mpsc::TryRecvError::Empty => Ok(None),
std::sync::mpsc::TryRecvError::Disconnected => Err(Error::Runtime(e.to_string())),
},
}
}
pub fn send_and_await(&self, query: W::Query) -> Result<W::Response, Error> {
self.send(query)?;
self.receive()
}
pub fn join(mut self) -> Result<(), Error> {
self.shutdown();
match self.handle {
Some(hnd) => hnd
.join()
.map_err(|_| Error::Runtime("Worker thread panicked".to_string())),
None => Ok(()),
}
}
}
pub trait InnerWorker
where
Self: Send,
<Self as InnerWorker>::RuntimeOptions: std::marker::Send + 'static + Clone,
<Self as InnerWorker>::Query: std::marker::Send + 'static,
<Self as InnerWorker>::Response: std::marker::Send + 'static,
{
type Runtime;
type RuntimeOptions;
type Query;
type Response;
fn init_runtime(options: Self::RuntimeOptions) -> Result<Self::Runtime, Error>;
fn handle_query(runtime: &mut Self::Runtime, query: Self::Query) -> Self::Response;
fn thread(mut runtime: Self::Runtime, rx: Receiver<Self::Query>, tx: Sender<Self::Response>) {
loop {
let Ok(msg) = rx.recv() else {
break;
};
let response = Self::handle_query(&mut runtime, msg);
if tx.send(response).is_err() {
break;
}
}
}
}
pub struct DefaultWorker(Worker<DefaultWorker>);
impl InnerWorker for DefaultWorker {
type Runtime = (
crate::Runtime,
std::collections::HashMap<deno_core::ModuleId, crate::ModuleHandle>,
);
type RuntimeOptions = DefaultWorkerOptions;
type Query = DefaultWorkerQuery;
type Response = DefaultWorkerResponse;
fn init_runtime(options: Self::RuntimeOptions) -> Result<Self::Runtime, Error> {
let runtime = crate::Runtime::new(crate::RuntimeOptions {
default_entrypoint: options.default_entrypoint,
timeout: options.timeout,
shared_array_buffer_store: options.shared_array_buffer_store,
startup_snapshot: options.startup_snapshot,
..Default::default()
})?;
let modules = std::collections::HashMap::new();
Ok((runtime, modules))
}
fn handle_query(runtime: &mut Self::Runtime, query: Self::Query) -> Self::Response {
let (runtime, modules) = runtime;
match query {
DefaultWorkerQuery::Eval(code) => match runtime.eval(&code) {
Ok(v) => Self::Response::Value(v),
Err(e) => Self::Response::Error(e),
},
DefaultWorkerQuery::LoadMainModule(module) => {
match runtime.load_modules(&module, vec![]) {
Ok(handle) => {
let id = handle.id();
modules.insert(id, handle);
Self::Response::ModuleId(id)
}
Err(e) => Self::Response::Error(e),
}
}
DefaultWorkerQuery::LoadModule(module) => match runtime.load_module(&module) {
Ok(handle) => {
let id = handle.id();
modules.insert(id, handle);
Self::Response::ModuleId(id)
}
Err(e) => Self::Response::Error(e),
},
DefaultWorkerQuery::CallEntrypoint(id, args) => match modules.get(&id) {
Some(handle) => match runtime.call_entrypoint(handle, &args) {
Ok(v) => Self::Response::Value(v),
Err(e) => Self::Response::Error(e),
},
None => Self::Response::Error(Error::Runtime("Module not found".to_string())),
},
DefaultWorkerQuery::CallFunction(id, name, args) => {
let handle = if let Some(id) = id {
match modules.get(&id) {
Some(handle) => Some(handle),
None => {
return Self::Response::Error(Error::Runtime(
"Module not found".to_string(),
))
}
}
} else {
None
};
match runtime.call_function(handle, &name, &args) {
Ok(v) => Self::Response::Value(v),
Err(e) => Self::Response::Error(e),
}
}
DefaultWorkerQuery::GetValue(id, name) => {
let handle = if let Some(id) = id {
match modules.get(&id) {
Some(handle) => Some(handle),
None => {
return Self::Response::Error(Error::Runtime(
"Module not found".to_string(),
))
}
}
} else {
None
};
match runtime.get_value(handle, &name) {
Ok(v) => Self::Response::Value(v),
Err(e) => Self::Response::Error(e),
}
}
}
}
}
impl DefaultWorker {
pub fn new(options: DefaultWorkerOptions) -> Result<Self, Error> {
Worker::new(options).map(Self)
}
#[must_use]
pub fn as_worker(&self) -> &Worker<DefaultWorker> {
&self.0
}
pub fn eval<T>(&self, code: String) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
match self.0.send_and_await(DefaultWorkerQuery::Eval(code))? {
DefaultWorkerResponse::Value(v) => Ok(crate::serde_json::from_value(v)?),
DefaultWorkerResponse::Error(e) => Err(e),
_ => Err(Error::Runtime(
"Unexpected response from the worker".to_string(),
)),
}
}
pub fn load_main_module(&self, module: crate::Module) -> Result<deno_core::ModuleId, Error> {
match self
.0
.send_and_await(DefaultWorkerQuery::LoadMainModule(module))?
{
DefaultWorkerResponse::ModuleId(id) => Ok(id),
DefaultWorkerResponse::Error(e) => Err(e),
_ => Err(Error::Runtime(
"Unexpected response from the worker".to_string(),
)),
}
}
pub fn load_module(&self, module: crate::Module) -> Result<deno_core::ModuleId, Error> {
match self
.0
.send_and_await(DefaultWorkerQuery::LoadModule(module))?
{
DefaultWorkerResponse::ModuleId(id) => Ok(id),
DefaultWorkerResponse::Error(e) => Err(e),
_ => Err(Error::Runtime(
"Unexpected response from the worker".to_string(),
)),
}
}
pub fn call_entrypoint<T>(
&self,
id: deno_core::ModuleId,
args: Vec<crate::serde_json::Value>,
) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
match self
.0
.send_and_await(DefaultWorkerQuery::CallEntrypoint(id, args))?
{
DefaultWorkerResponse::Value(v) => {
crate::serde_json::from_value(v).map_err(Error::from)
}
DefaultWorkerResponse::Error(e) => Err(e),
_ => Err(Error::Runtime(
"Unexpected response from the worker".to_string(),
)),
}
}
pub fn call_function<T>(
&self,
module_context: Option<deno_core::ModuleId>,
name: String,
args: Vec<crate::serde_json::Value>,
) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
match self
.0
.send_and_await(DefaultWorkerQuery::CallFunction(module_context, name, args))?
{
DefaultWorkerResponse::Value(v) => {
crate::serde_json::from_value(v).map_err(Error::from)
}
DefaultWorkerResponse::Error(e) => Err(e),
_ => Err(Error::Runtime(
"Unexpected response from the worker".to_string(),
)),
}
}
pub fn get_value<T>(
&self,
module_context: Option<deno_core::ModuleId>,
name: String,
) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
match self
.0
.send_and_await(DefaultWorkerQuery::GetValue(module_context, name))?
{
DefaultWorkerResponse::Value(v) => {
crate::serde_json::from_value(v).map_err(Error::from)
}
DefaultWorkerResponse::Error(e) => Err(e),
_ => Err(Error::Runtime(
"Unexpected response from the worker".to_string(),
)),
}
}
}
impl AsRef<Worker<DefaultWorker>> for DefaultWorker {
fn as_ref(&self) -> &Worker<DefaultWorker> {
&self.0
}
}
#[derive(Default, Clone)]
pub struct DefaultWorkerOptions {
pub default_entrypoint: Option<String>,
pub timeout: std::time::Duration,
pub startup_snapshot: Option<&'static [u8]>,
pub shared_array_buffer_store: Option<deno_core::SharedArrayBufferStore>,
}
#[derive(Debug, Clone)]
pub enum DefaultWorkerQuery {
Eval(String),
LoadMainModule(crate::Module),
LoadModule(crate::Module),
CallEntrypoint(deno_core::ModuleId, Vec<crate::serde_json::Value>),
CallFunction(
Option<deno_core::ModuleId>,
String,
Vec<crate::serde_json::Value>,
),
GetValue(Option<deno_core::ModuleId>, String),
}
#[derive(Debug, Clone)]
pub enum DefaultWorkerResponse {
Value(crate::serde_json::Value),
ModuleId(deno_core::ModuleId),
Ok(()),
Error(Error),
}