use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use futures::{Stream, StreamExt};
use objectiveai_sdk::cli::command::plugins::run::{Request, ResponseItem};
use objectiveai_sdk::cli::plugins::Output as PluginOutput;
use objectiveai_sdk::cli::{Error as CliError, ErrorType as CliErrorType};
use serde::Serialize;
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStdin, Command};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use crate::child_io::{PipeEvent, spawn_pipe_reader};
use crate::context::Context;
use crate::error::Error;
type ItemStream = Pin<Box<dyn Stream<Item = Result<ResponseItem, Error>> + Send>>;
pub async fn execute(ctx: &Context, request: Request) -> Result<ItemStream, Error> {
let coord = format!("{}/{}/{}", request.owner, request.name, request.version);
let exe = ctx
.filesystem
.resolve_plugin(&request.owner, &request.name, &request.version)
.await
.ok_or_else(|| Error::PluginNotFound(coord.clone()))?;
let mut nested_ctx = ctx.clone();
nested_ctx.config.plugin_owner = Some(request.owner.clone());
nested_ctx.config.plugin_repository = Some(request.name.clone());
nested_ctx.config.plugin_version = Some(request.version.clone());
nested_ctx.plugin = Some(crate::plugin_path::PluginPath {
owner: request.owner.clone(),
repository: request.name.clone(),
version: request.version.clone(),
});
let mut cmd = Command::new(&exe);
cmd.args(&request.args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
crate::spawn::apply_config_env(&mut cmd, &nested_ctx.config);
let mut child = cmd.spawn().map_err(Error::PluginSpawn)?;
let stdout = child.stdout.take().expect("stdout was piped");
let stderr = child.stderr.take().expect("stderr was piped");
let stdin = child.stdin.take().expect("stdin was piped");
let plugin_stdin: Arc<Mutex<ChildStdin>> = Arc::new(Mutex::new(stdin));
let mut events = spawn_pipe_reader(stdout, stderr);
let stream = async_stream::stream! {
let mut command_tasks: Vec<(Option<String>, JoinHandle<i32>)> = Vec::new();
while let Some(event) = events.recv().await {
match event {
PipeEvent::Stderr(_) => {
yield Ok(ResponseItem::Error(CliError {
r#type: CliErrorType::Error,
level: None,
fatal: None,
message: serde_json::Value::Null,
}));
}
PipeEvent::Stdout(trimmed) => {
match serde_json::from_str::<PluginOutput>(&trimmed) {
Ok(PluginOutput::Error(e)) => {
yield Ok(ResponseItem::Error(e));
}
Ok(PluginOutput::Mcp(mcp)) => {
yield Ok(ResponseItem::Mcp(mcp));
}
Ok(PluginOutput::Command(c)) => {
let task_id = Some(c.id);
let task = run_nested_command(
nested_ctx.clone(),
c.command,
plugin_stdin.clone(),
task_id.clone(),
);
command_tasks.push((task_id, task));
}
Ok(PluginOutput::Notification(value)) => {
yield Ok(ResponseItem::Notification(value));
}
Err(_) => {
yield Ok(ResponseItem::Notification(
serde_json::Value::String(trimmed),
));
}
}
}
PipeEvent::StdoutEof | PipeEvent::StderrEof => {}
PipeEvent::StdoutErr(e) | PipeEvent::StderrErr(e) => {
yield Err(Error::PluginRead(e));
return;
}
}
}
for (id, task) in command_tasks {
let exit_code = task.await.unwrap_or(-1);
let envelope = PluginCommandResponse {
id: id.as_deref(),
value: CommandComplete {
kind: "command_complete",
exit_code,
},
};
let _ = write_envelope(&plugin_stdin, &envelope).await;
}
drop(plugin_stdin);
match child.wait().await {
Ok(status) if status.success() => {}
Ok(status) => {
yield Err(Error::PluginExit(status.code().unwrap_or(1)));
}
Err(e) => {
yield Err(Error::PluginRead(e));
}
}
};
Ok(Box::pin(stream))
}
fn run_nested_command(
ctx: Context,
command: String,
plugin_stdin: Arc<Mutex<ChildStdin>>,
id: Option<String>,
) -> JoinHandle<i32> {
tokio::spawn(async move {
let id = id.as_deref();
let tokens: Vec<String> = command.split_whitespace().map(String::from).collect();
let forbidden = match tokens.first().map(String::as_str) {
Some("plugins") => Some("plugins"),
Some("tools") => Some("tools"),
_ => None,
};
if let Some(kind) = forbidden {
let _ = forward_error(
&plugin_stdin,
id,
&Error::PluginCommandForbidden(kind),
Some(true),
)
.await;
return 1;
}
let mut args: Vec<String> = vec!["objectiveai-cli".to_string()];
args.extend(tokens);
let mut stream = match crate::run(args, Some(ctx)).await {
Ok(s) => s,
Err(e) => {
if let Error::ClapParse(ref clap_err) = e {
if crate::is_informational(clap_err) {
let _ = forward_help(&plugin_stdin, id, &clap_err.to_string()).await;
return 0;
}
}
let _ = forward_error(&plugin_stdin, id, &e, Some(true)).await;
return match e {
Error::ToolExit(code) => code,
_ => 1,
};
}
};
let mut last_tool_exit: Option<i32> = None;
while let Some(item) = stream.next().await {
let written = match item {
Ok(response) => forward_line(&plugin_stdin, id, &response).await,
Err(e) => {
if let Error::ToolExit(code) = &e {
last_tool_exit = Some(*code);
}
forward_error(&plugin_stdin, id, &e, None).await
}
};
if written.is_err() {
break;
}
}
last_tool_exit.unwrap_or(0)
})
}
async fn forward_line<T: Serialize>(
plugin_stdin: &Arc<Mutex<ChildStdin>>,
id: Option<&str>,
value: &T,
) -> std::io::Result<()> {
write_envelope(plugin_stdin, &PluginCommandResponse { id, value }).await
}
async fn forward_error(
plugin_stdin: &Arc<Mutex<ChildStdin>>,
id: Option<&str>,
e: &Error,
fatal: Option<bool>,
) -> std::io::Result<()> {
let payload = CliError {
r#type: CliErrorType::Error,
level: Some(objectiveai_sdk::cli::Level::Error),
fatal,
message: e.output_message(),
};
forward_line(plugin_stdin, id, &payload).await
}
async fn forward_help(
plugin_stdin: &Arc<Mutex<ChildStdin>>,
id: Option<&str>,
help: &str,
) -> std::io::Result<()> {
let payload = serde_json::json!({ "type": "help", "help": help });
forward_line(plugin_stdin, id, &payload).await
}
async fn write_envelope<T: Serialize>(
stdin: &Arc<Mutex<ChildStdin>>,
envelope: &T,
) -> std::io::Result<()> {
let line = serde_json::to_string(envelope).expect("envelope serializes");
let mut guard = stdin.lock().await;
guard.write_all(line.as_bytes()).await?;
guard.write_all(b"\n").await?;
guard.flush().await?;
Ok(())
}
#[derive(Serialize)]
struct PluginCommandResponse<'a, T> {
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<&'a str>,
value: T,
}
#[derive(Serialize)]
struct CommandComplete {
#[serde(rename = "type")]
kind: &'static str,
exit_code: i32,
}
pub mod request_schema {
use objectiveai_sdk::cli::command::plugins::run as sdk;
use objectiveai_sdk::cli::command::plugins::run::request_schema::{Request, Response};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(schemars::schema_for!(sdk::Request)))
}
}
pub mod response_schema {
use objectiveai_sdk::cli::command::plugins::run as sdk;
use objectiveai_sdk::cli::command::plugins::run::response_schema::{Request, Response};
use crate::context::Context;
use crate::error::Error;
pub async fn execute(_ctx: &Context, _request: Request) -> Result<Response, Error> {
Ok(objectiveai_sdk::cli::command::ResponseSchema(schemars::schema_for!(sdk::ResponseItem)))
}
}