use std::{
collections::HashMap,
process::Stdio,
sync::{
Arc,
atomic::{AtomicI64, Ordering},
},
time::Duration,
};
use anyhow::{Context, Result, bail};
use serde_json::{Value, json};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
process::{Child, ChildStdin, Command},
sync::{Mutex, oneshot},
task::JoinHandle,
time,
};
use tracing::{debug, error, warn};
use super::manifest::PluginManifest;
const DEFAULT_CALL_TIMEOUT_SECS: u64 = 30;
#[derive(Clone)]
pub struct ShellBridgePlugin {
pub name: String,
stdin: Arc<Mutex<ChildStdin>>,
child: Arc<Mutex<Child>>,
next_id: Arc<AtomicI64>,
timeout: Duration,
pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>>,
reader_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}
impl ShellBridgePlugin {
pub async fn spawn(
manifest: &PluginManifest,
host_dispatch: Arc<crate::plugin::host_methods::HostMethodRegistry>,
) -> Result<Self> {
let runtime = resolve_runtime(&manifest.runtime)?;
let entry = manifest.dir.join(&manifest.entry);
if !entry.exists() {
bail!(
"plugin `{}` entry not found: {}",
manifest.name,
entry.display()
);
}
let mut child = Command::new(&runtime)
.arg(&entry)
.current_dir(&manifest.dir)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.kill_on_drop(true)
.spawn()
.with_context(|| {
format!(
"spawn plugin `{}` with {runtime} {}",
manifest.name,
entry.display()
)
})?;
let stdin = child.stdin.take().context("plugin stdin")?;
let stdout = child.stdout.take().context("plugin stdout")?;
debug!(plugin = %manifest.name, runtime, "plugin subprocess started");
let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>> =
Arc::new(Mutex::new(HashMap::new()));
let stdin_arc = Arc::new(Mutex::new(stdin));
let reader_pending = pending.clone();
let reader_stdin = stdin_arc.clone();
let reader_dispatch = host_dispatch.clone();
let reader_name = manifest.name.clone();
let reader_handle = tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
debug!(plugin = %reader_name, "plugin stdout closed (EOF)");
break;
}
Ok(_) => {
if let Err(e) = handle_incoming(
line.trim_end_matches('\n'),
&reader_pending,
reader_stdin.clone(),
&reader_dispatch,
&reader_name,
)
.await
{
warn!(plugin = %reader_name, "incoming dispatch error: {e:#}");
}
}
Err(e) => {
error!(plugin = %reader_name, "stdout read error: {e:#}");
break;
}
}
}
});
let timeout = manifest
.timeout_ms
.map(Duration::from_millis)
.unwrap_or(Duration::from_secs(DEFAULT_CALL_TIMEOUT_SECS));
Ok(Self {
name: manifest.name.clone(),
stdin: stdin_arc,
child: Arc::new(Mutex::new(child)),
next_id: Arc::new(AtomicI64::new(1)),
timeout,
pending,
reader_handle: Arc::new(Mutex::new(Some(reader_handle))),
})
}
pub async fn call(&self, method: &str, params: Value) -> Result<Value> {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let request = json!({
"id": id,
"method": method,
"params": params,
});
let line = serde_json::to_string(&request).context("serialize request")?;
let (tx, rx) = oneshot::channel();
self.pending.lock().await.insert(id, tx);
let write_result: Result<()> = {
let mut stdin = self.stdin.lock().await;
async {
stdin
.write_all(line.as_bytes())
.await
.context("write request")?;
stdin.write_all(b"\n").await.context("write newline")?;
stdin.flush().await.context("flush stdin")?;
Ok(())
}
.await
};
if let Err(e) = write_result {
self.pending.lock().await.remove(&id);
return Err(e).with_context(|| format!("write to plugin `{}`", self.name));
}
match time::timeout(self.timeout, rx).await {
Ok(Ok(Ok(v))) => Ok(v),
Ok(Ok(Err(e))) => bail!("plugin `{}` error: {e}", self.name),
Ok(Err(_canceled)) => {
self.pending.lock().await.remove(&id);
bail!("plugin `{}` reader task ended unexpectedly", self.name);
}
Err(_timeout) => {
self.pending.lock().await.remove(&id);
bail!(
"plugin `{}` call `{method}` timed out after {}s",
self.name,
self.timeout.as_secs()
);
}
}
}
pub async fn shutdown(&self) {
let mut child = self.child.lock().await;
let _ = child.kill().await;
drop(child);
if let Some(handle) = self.reader_handle.lock().await.take() {
let _ = handle.await;
}
debug!(plugin = %self.name, "plugin subprocess terminated");
}
}
async fn handle_incoming(
line: &str,
pending: &Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>,
stdin: Arc<Mutex<ChildStdin>>,
host_dispatch: &Arc<crate::plugin::host_methods::HostMethodRegistry>,
plugin_name: &str,
) -> Result<()> {
let msg: Value = serde_json::from_str(line)
.with_context(|| format!("plugin `{plugin_name}` invalid JSON: {line}"))?;
let id = msg["id"]
.as_i64()
.ok_or_else(|| anyhow::anyhow!("plugin `{plugin_name}` message has no integer id"))?;
if id == 0 {
bail!("plugin `{plugin_name}` sent id=0, which is reserved");
}
if id > 0 {
let mut map = pending.lock().await;
if let Some(tx) = map.remove(&id) {
let result = if let Some(err) = msg.get("error") {
Err(err.to_string())
} else {
Ok(msg.get("result").cloned().unwrap_or(Value::Null))
};
let _ = tx.send(result);
} else {
warn!(
plugin = %plugin_name,
id,
"response with no pending request — dropping"
);
}
Ok(())
} else {
let method = msg["method"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("plugin `{plugin_name}` request id={id} has no method"))?
.to_owned();
let params = msg.get("params").cloned().unwrap_or(Value::Null);
let dispatch = host_dispatch.clone();
let name = plugin_name.to_owned();
tokio::spawn(async move {
let result = dispatch.handle(&method, params).await;
let response = match result {
Ok(v) => json!({ "id": id, "result": v }),
Err(e) => json!({ "id": id, "error": e.to_string() }),
};
match serde_json::to_string(&response) {
Ok(line) => {
let mut sin = stdin.lock().await;
if let Err(e) = sin.write_all(line.as_bytes()).await {
warn!(plugin = %name, "failed to write response: {e}");
return;
}
if let Err(e) = sin.write_all(b"\n").await {
warn!(plugin = %name, "failed to write newline: {e}");
return;
}
if let Err(e) = sin.flush().await {
warn!(plugin = %name, "failed to flush: {e}");
}
}
Err(e) => {
warn!(plugin = %name, "failed to serialize response: {e}");
}
}
});
Ok(())
}
}
fn resolve_runtime(runtime: &str) -> Result<String> {
let candidates = match runtime {
"bun" => vec!["bun"],
"deno" => vec!["deno"],
"node" => vec!["node"],
other => vec![other],
};
let tools_dir = crate::config::loader::base_dir().join("tools/node/bin");
if tools_dir.exists() {
for candidate in &candidates {
let bin = tools_dir.join(candidate);
if bin.exists() {
return Ok(bin.to_string_lossy().to_string());
}
}
}
#[cfg(target_os = "windows")]
{
let tools_dir_win = crate::config::loader::base_dir().join("tools/node");
if tools_dir_win.exists() {
for candidate in &candidates {
let bin = tools_dir_win.join(format!("{candidate}.exe"));
if bin.exists() {
return Ok(bin.to_string_lossy().to_string());
}
}
}
}
for candidate in &candidates {
if which::which(candidate).is_ok() {
return Ok(candidate.to_string());
}
}
bail!(
"no suitable shell-plugin runtime found for `{runtime}`. \
Run `rsclaw tools install node`, download from https://gitfast.io, or install node/bun/deno manually."
)
}
pub struct Plugin {
inner: ShellBridgePlugin,
pub manifest: PluginManifest,
}
impl Plugin {
pub async fn spawn(
manifest: PluginManifest,
host_dispatch: Arc<crate::plugin::host_methods::HostMethodRegistry>,
) -> Result<Self> {
let inner = ShellBridgePlugin::spawn(&manifest, host_dispatch).await?;
Ok(Self { inner, manifest })
}
pub async fn call(&self, method: &str, params: Value) -> Result<Value> {
self.inner.call(method, params).await
}
pub async fn shutdown(&self) {
self.inner.shutdown().await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn resolve_node_runtime() {
let res = resolve_runtime("node");
if which::which("node").is_ok() {
assert!(res.is_ok());
}
}
#[test]
fn resolve_unknown_runtime_fails() {
assert!(resolve_runtime("__nonexistent_runtime_xyz__").is_err());
}
}