use anyhow::{Context, Result};
use serde_json::{json, Value};
use std::io::{self, BufRead, BufReader, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use uuid::Uuid;
use super::types::*;
use super::{chat_session::ChatSession, skills};
struct RpcEventSink {
writer: Arc<Mutex<io::Stdout>>,
confirmation_rx: Arc<Mutex<BufReader<io::Stdin>>>,
}
impl RpcEventSink {
fn new(writer: Arc<Mutex<io::Stdout>>, reader: Arc<Mutex<BufReader<io::Stdin>>>) -> Self {
Self {
writer,
confirmation_rx: reader,
}
}
fn emit(&self, event: &str, data: Value) {
let msg = json!({ "event": event, "data": data });
if let Ok(mut w) = self.writer.lock() {
let _ = writeln!(w, "{}", msg);
let _ = w.flush();
}
}
}
impl EventSink for RpcEventSink {
fn on_text(&mut self, text: &str) {
self.emit("text", json!({ "text": text }));
}
fn on_text_chunk(&mut self, chunk: &str) {
self.emit("text_chunk", json!({ "text": chunk }));
}
fn on_tool_call(&mut self, name: &str, arguments: &str) {
self.emit("tool_call", json!({ "name": name, "arguments": arguments }));
}
fn on_tool_result(&mut self, name: &str, result: &str, is_error: bool) {
self.emit(
"tool_result",
json!({ "name": name, "result": result, "is_error": is_error }),
);
}
fn on_command_started(&mut self, command: &str) {
self.emit("command_started", json!({ "command": command }));
}
fn on_command_output(&mut self, stream: &str, chunk: &str) {
self.emit(
"command_output",
json!({ "stream": stream, "chunk": chunk }),
);
}
fn on_command_finished(&mut self, success: bool, exit_code: i32, duration_ms: u64) {
self.emit(
"command_finished",
json!({ "success": success, "exit_code": exit_code, "duration_ms": duration_ms }),
);
}
fn on_preview_started(&mut self, path: &str, port: u16) {
self.emit("preview_started", json!({ "path": path, "port": port }));
}
fn on_preview_ready(&mut self, url: &str, port: u16) {
self.emit("preview_ready", json!({ "url": url, "port": port }));
}
fn on_preview_failed(&mut self, message: &str) {
self.emit("preview_failed", json!({ "message": message }));
}
fn on_preview_stopped(&mut self, reason: &str) {
self.emit("preview_stopped", json!({ "reason": reason }));
}
fn on_swarm_started(&mut self, description: &str) {
self.emit("swarm_started", json!({ "description": description }));
}
fn on_swarm_progress(&mut self, status: &str) {
self.emit("swarm_progress", json!({ "status": status }));
}
fn on_swarm_finished(&mut self, summary: &str) {
self.emit("swarm_finished", json!({ "summary": summary }));
}
fn on_swarm_failed(&mut self, message: &str) {
self.emit("swarm_failed", json!({ "message": message }));
}
fn on_confirmation_request(&mut self, prompt: &str) -> bool {
self.emit("confirmation_request", json!({ "prompt": prompt }));
if let Ok(mut reader) = self.confirmation_rx.lock() {
let mut line = String::new();
if reader.read_line(&mut line).is_ok() {
if let Ok(msg) = serde_json::from_str::<Value>(line.trim()) {
if msg.get("method").and_then(|m| m.as_str()) == Some("confirm") {
return msg
.get("params")
.and_then(|p| p.get("approved"))
.and_then(|a| a.as_bool())
.unwrap_or(false);
}
}
}
}
false
}
fn on_clarification_request(
&mut self,
request: &ClarificationRequest,
) -> ClarificationResponse {
self.emit(
"clarification_request",
json!({
"reason": request.reason,
"message": request.message,
"suggestions": request.suggestions,
}),
);
if let Ok(mut reader) = self.confirmation_rx.lock() {
let mut line = String::new();
if reader.read_line(&mut line).is_ok() {
if let Ok(msg) = serde_json::from_str::<Value>(line.trim()) {
if msg.get("method").and_then(|m| m.as_str()) == Some("clarify") {
let params = msg.get("params").cloned().unwrap_or(json!({}));
let action = params
.get("action")
.and_then(|a| a.as_str())
.unwrap_or("stop");
if action == "continue" {
let hint = params
.get("hint")
.and_then(|h| h.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
return ClarificationResponse::Continue(hint);
}
}
}
}
}
ClarificationResponse::Stop
}
fn on_task_plan(&mut self, tasks: &[Task]) {
self.emit("task_plan", json!({ "tasks": tasks }));
}
fn on_task_progress(&mut self, task_id: u32, completed: bool, tasks: &[Task]) {
self.emit(
"task_progress",
json!({ "task_id": task_id, "completed": completed, "tasks": tasks }),
);
}
}
pub fn serve_agent_rpc() -> Result<()> {
skilllite_core::config::ensure_default_output_dir();
let stdin = io::stdin();
let stdout = io::stdout();
let writer = Arc::new(Mutex::new(stdout));
let reader_arc = Arc::new(Mutex::new(BufReader::new(stdin)));
let rt = tokio::runtime::Runtime::new().context("Failed to create tokio runtime")?;
loop {
let mut line = String::new();
{
let mut reader = reader_arc
.lock()
.map_err(|e| anyhow::anyhow!("stdin lock poisoned: {}", e))?;
match reader.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {}
Err(e) => {
emit_event(
&writer,
"error",
json!({ "message": format!("stdin read error: {}", e) }),
);
break;
}
}
}
let line = line.trim();
if line.is_empty() {
continue;
}
let request: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(e) => {
emit_event(
&writer,
"error",
json!({ "message": format!("JSON parse error: {}", e) }),
);
continue;
}
};
let method = request.get("method").and_then(|m| m.as_str()).unwrap_or("");
let params = request.get("params").cloned().unwrap_or(json!({}));
match method {
"agent_chat" => {
let writer_clone = Arc::clone(&writer);
let reader_clone = Arc::clone(&reader_arc);
if let Err(e) = rt.block_on(handle_agent_chat(¶ms, writer_clone, reader_clone))
{
emit_event(&writer, "error", json!({ "message": e.to_string() }));
}
}
"ping" => {
emit_event(&writer, "pong", json!({}));
}
"confirm" | "clarify" => {
}
_ => {
emit_event(
&writer,
"error",
json!({ "message": format!("Unknown method: {}", method) }),
);
}
}
}
Ok(())
}
fn emit_event(writer: &Arc<Mutex<io::Stdout>>, event: &str, data: Value) {
let msg = json!({ "event": event, "data": data });
if let Ok(mut w) = writer.lock() {
let _ = writeln!(w, "{}", msg);
let _ = w.flush();
}
}
async fn handle_agent_chat(
params: &Value,
writer: Arc<Mutex<io::Stdout>>,
reader: Arc<Mutex<BufReader<io::Stdin>>>,
) -> Result<()> {
let message = params
.get("message")
.and_then(|m| m.as_str())
.context("'message' is required in agent_chat params")?;
let session_key = params
.get("session_key")
.and_then(|s| s.as_str())
.unwrap_or("default");
let mut config = AgentConfig::from_env();
if let Some(overrides) = params.get("config") {
if let Some(model) = overrides.get("model").and_then(|v| v.as_str()) {
config.model = model.to_string();
}
if let Some(base) = overrides.get("api_base").and_then(|v| v.as_str()) {
config.api_base = base.to_string();
}
if let Some(key) = overrides.get("api_key").and_then(|v| v.as_str()) {
config.api_key = key.to_string();
}
if let Some(ws) = overrides.get("workspace").and_then(|v| v.as_str()) {
config.workspace = ws.to_string();
}
if let Some(max) = overrides.get("max_iterations").and_then(|v| v.as_u64()) {
config.max_iterations = max as usize;
}
if let Some(plan) = overrides
.get("enable_task_planning")
.and_then(|v| v.as_bool())
{
config.enable_task_planning = plan;
}
if let Some(sp) = overrides.get("soul_path").and_then(|v| v.as_str()) {
config.soul_path = Some(sp.to_string());
}
if let Some(skip) = overrides
.get("skip_history_for_planning")
.and_then(|v| v.as_bool())
{
config.skip_history_for_planning = skip;
}
}
if let Some(ctx) = params
.get("context")
.and_then(|c| c.get("append"))
.and_then(|a| a.as_str())
{
config.context_append = Some(ctx.to_string());
}
if config.api_key.is_empty() {
anyhow::bail!("API key required. Set OPENAI_API_KEY env var.");
}
let skill_dirs: Vec<String> =
if let Some(dirs) = params.get("skill_dirs").and_then(|v| v.as_array()) {
dirs.iter()
.filter_map(|d| d.as_str().map(|s| s.to_string()))
.collect()
} else {
skilllite_core::skill::discovery::discover_skill_dirs_for_loading(
Path::new(&config.workspace),
Some(&[".skills", "skills"]),
)
};
let loaded_skills = skills::load_skills(&skill_dirs);
let mut session = ChatSession::new(config, session_key, loaded_skills);
let mut sink = RpcEventSink::new(writer.clone(), reader);
match session.run_turn(message, &mut sink).await {
Ok(agent_result) => {
let task_id = Uuid::new_v4().to_string();
let node_result = agent_result.to_node_result(&task_id);
let data = serde_json::to_value(&node_result).unwrap_or_else(|_| {
serde_json::json!({
"task_id": task_id,
"response": agent_result.response,
"task_completed": agent_result.feedback.task_completed,
"tool_calls": agent_result.feedback.total_tools,
"new_skill": serde_json::Value::Null
})
});
emit_event(&writer, "done", data);
}
Err(e) => {
emit_event(&writer, "error", json!({ "message": e.to_string() }));
}
}
Ok(())
}