use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::mpsc;
#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpcRequest {
pub jsonrpc: Arc<str>,
pub id: u64,
pub method: Arc<str>,
pub params: Option<Value>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpcResponse {
pub jsonrpc: Arc<str>,
pub id: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<JsonRpcError>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpcError {
pub code: i64,
pub message: Arc<str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
pub struct ShellBridge {
runtime: Arc<str>,
child: Option<Child>,
request_id: u64,
stdin_tx: Option<mpsc::Sender<String>>,
stdout_rx: Option<mpsc::Receiver<String>>,
}
impl ShellBridge {
pub fn new(runtime: Option<Arc<str>>) -> Self {
Self {
runtime: runtime.unwrap_or_else(|| Arc::from("node")),
child: None,
request_id: 0,
stdin_tx: None,
stdout_rx: None,
}
}
pub async fn start(&mut self, plugin_path: &Path, entry_point: &str) -> Result<()> {
let script_path = plugin_path.join(entry_point);
if !script_path.exists() {
anyhow::bail!("Plugin entry point not found: {:?}", script_path);
}
let mut child = Command::new(self.runtime.as_ref())
.arg(&script_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| format!("Failed to spawn {} process", self.runtime))?;
let stdin = child.stdin.take().context("Failed to get stdin")?;
let stdout = child.stdout.take().context("Failed to get stdout")?;
let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(32);
let (stdout_tx, stdout_rx) = mpsc::channel::<String>(32);
tokio::spawn(async move {
let mut stdin = stdin;
while let Some(msg) = stdin_rx.recv().await {
if stdin.write_all(msg.as_bytes()).await.is_err() {
break;
}
if stdin.write_all(b"\n").await.is_err() {
break;
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
let trimmed = line.trim().to_string();
if !trimmed.is_empty() {
let _ = stdout_tx.send(trimmed).await;
}
}
Err(_) => break,
}
}
});
self.child = Some(child);
self.stdin_tx = Some(stdin_tx);
self.stdout_rx = Some(stdout_rx);
Ok(())
}
pub async fn call(&mut self, method: &str, params: Option<Value>) -> Result<Value> {
self.request_id += 1;
let id = self.request_id;
let request = JsonRpcRequest {
jsonrpc: Arc::from("2.0"),
id,
method: Arc::from(method),
params,
};
let request_json = serde_json::to_string(&request)
.context("Failed to serialize request")?;
let stdin_tx = self.stdin_tx.as_ref()
.context("Shell bridge not started")?;
stdin_tx.send(request_json).await
.context("Failed to send request")?;
let stdout_rx = self.stdout_rx.as_mut()
.context("Shell bridge not started")?;
let response_json = tokio::time::timeout(
std::time::Duration::from_secs(30),
stdout_rx.recv()
).await
.context("Response timeout")?
.context("Failed to receive response")?;
let response: JsonRpcResponse = serde_json::from_str(&response_json)
.context("Failed to parse response")?;
if let Some(error) = response.error {
anyhow::bail!("Plugin error ({}): {}", error.code, error.message);
}
response.result.context("No result in response")
}
pub async fn stop(&mut self) -> Result<()> {
if let Some(mut child) = self.child.take() {
child.kill().await.context("Failed to kill process")?;
}
self.stdin_tx = None;
self.stdout_rx = None;
Ok(())
}
}
impl Drop for ShellBridge {
fn drop(&mut self) {
if let Some(mut child) = self.child.take() {
let _ = child.start_kill();
}
}
}