use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
const WORKER_JS: &str = r#"
const readline = require('readline');
const rl = readline.createInterface({ input: process.stdin });
rl.on('line', (line) => {
try {
const result = (0, eval)(line);
const type = typeof result;
if (type === 'number' || type === 'string' || type === 'boolean' || result === null) {
process.stdout.write(JSON.stringify({ ok: true, value: result }) + '\n');
} else {
process.stdout.write(JSON.stringify({ ok: false, error: 'non-primitive' }) + '\n');
}
} catch (e) {
process.stdout.write(JSON.stringify({ ok: false, error: String(e) }) + '\n');
}
});
"#;
pub struct NodeProcess {
child: Option<Child>,
stdin: Option<ChildStdin>,
stdout: Option<BufReader<ChildStdout>>,
cache: HashMap<String, Option<serde_json::Value>>,
}
impl NodeProcess {
pub fn spawn() -> std::io::Result<Self> {
let mut child = Command::new("node")
.arg("-e")
.arg(WORKER_JS)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()?;
let stdin = child.stdin.take();
let stdout = child.stdout.take().map(BufReader::new);
Ok(Self {
child: Some(child),
stdin,
stdout,
cache: HashMap::new(),
})
}
fn ensure_running(&mut self) -> bool {
if let Some(ref mut child) = self.child {
match child.try_wait() {
Ok(Some(_)) => {
tracing::warn!("Node.js process died, restarting");
}
Ok(None) => return true, Err(_) => {}
}
}
match Self::spawn() {
Ok(mut new_proc) => {
if let Some(ref mut child) = self.child {
let _ = child.kill();
}
self.child = std::mem::take(&mut new_proc.child);
self.stdin = std::mem::take(&mut new_proc.stdin);
self.stdout = std::mem::take(&mut new_proc.stdout);
true
}
Err(e) => {
tracing::error!("Failed to spawn Node.js: {}", e);
false
}
}
}
pub fn eval(&mut self, expr: &str) -> Option<serde_json::Value> {
if let Some(cached) = self.cache.get(expr) {
return cached.clone();
}
let result = self.eval_uncached(expr);
self.cache.insert(expr.to_string(), result.clone());
result
}
fn eval_uncached(&mut self, expr: &str) -> Option<serde_json::Value> {
if !self.ensure_running() {
return None;
}
let stdin = self.stdin.as_mut()?;
writeln!(stdin, "{expr}").ok()?;
stdin.flush().ok()?;
let stdout = self.stdout.as_mut()?;
let mut line = String::new();
match stdout.read_line(&mut line) {
Ok(0) => {
tracing::warn!("Node.js returned EOF");
return None;
}
Ok(_) => {}
Err(e) => {
tracing::warn!("Node.js read error: {}", e);
return None;
}
}
let response: serde_json::Value = serde_json::from_str(line.trim()).ok()?;
if response.get("ok")?.as_bool()? {
Some(response.get("value")?.clone())
} else {
None
}
}
pub fn is_healthy(&mut self) -> bool {
if let Some(ref mut child) = self.child {
matches!(child.try_wait(), Ok(None))
} else {
false
}
}
}
impl Drop for NodeProcess {
fn drop(&mut self) {
if let Some(ref mut child) = self.child {
let _ = child.kill();
}
}
}