1use anyhow::{Context, Result};
63use serde_json::{json, Value};
64use std::io::{self, BufRead, BufReader, Write};
65use std::path::Path;
66use std::sync::{Arc, Mutex};
67use uuid::Uuid;
68
69use super::types::*;
70use super::{chat_session::ChatSession, skills};
71
72struct RpcEventSink {
77 writer: Arc<Mutex<io::Stdout>>,
79 confirmation_rx: Arc<Mutex<BufReader<io::Stdin>>>,
81}
82
83impl RpcEventSink {
84 fn new(writer: Arc<Mutex<io::Stdout>>, reader: Arc<Mutex<BufReader<io::Stdin>>>) -> Self {
85 Self {
86 writer,
87 confirmation_rx: reader,
88 }
89 }
90
91 fn emit(&self, event: &str, data: Value) {
92 let msg = json!({ "event": event, "data": data });
93 if let Ok(mut w) = self.writer.lock() {
94 let _ = writeln!(w, "{}", msg);
95 let _ = w.flush();
96 }
97 }
98}
99
100impl EventSink for RpcEventSink {
101 fn on_text(&mut self, text: &str) {
102 self.emit("text", json!({ "text": text }));
103 }
104
105 fn on_text_chunk(&mut self, chunk: &str) {
106 self.emit("text_chunk", json!({ "text": chunk }));
107 }
108
109 fn on_tool_call(&mut self, name: &str, arguments: &str) {
110 self.emit("tool_call", json!({ "name": name, "arguments": arguments }));
111 }
112
113 fn on_tool_result(&mut self, name: &str, result: &str, is_error: bool) {
114 self.emit(
115 "tool_result",
116 json!({ "name": name, "result": result, "is_error": is_error }),
117 );
118 }
119
120 fn on_command_started(&mut self, command: &str) {
121 self.emit("command_started", json!({ "command": command }));
122 }
123
124 fn on_command_output(&mut self, stream: &str, chunk: &str) {
125 self.emit(
126 "command_output",
127 json!({ "stream": stream, "chunk": chunk }),
128 );
129 }
130
131 fn on_command_finished(&mut self, success: bool, exit_code: i32, duration_ms: u64) {
132 self.emit(
133 "command_finished",
134 json!({ "success": success, "exit_code": exit_code, "duration_ms": duration_ms }),
135 );
136 }
137
138 fn on_preview_started(&mut self, path: &str, port: u16) {
139 self.emit("preview_started", json!({ "path": path, "port": port }));
140 }
141
142 fn on_preview_ready(&mut self, url: &str, port: u16) {
143 self.emit("preview_ready", json!({ "url": url, "port": port }));
144 }
145
146 fn on_preview_failed(&mut self, message: &str) {
147 self.emit("preview_failed", json!({ "message": message }));
148 }
149
150 fn on_preview_stopped(&mut self, reason: &str) {
151 self.emit("preview_stopped", json!({ "reason": reason }));
152 }
153
154 fn on_swarm_started(&mut self, description: &str) {
155 self.emit("swarm_started", json!({ "description": description }));
156 }
157
158 fn on_swarm_progress(&mut self, status: &str) {
159 self.emit("swarm_progress", json!({ "status": status }));
160 }
161
162 fn on_swarm_finished(&mut self, summary: &str) {
163 self.emit("swarm_finished", json!({ "summary": summary }));
164 }
165
166 fn on_swarm_failed(&mut self, message: &str) {
167 self.emit("swarm_failed", json!({ "message": message }));
168 }
169
170 fn on_confirmation_request(&mut self, prompt: &str) -> bool {
171 self.emit("confirmation_request", json!({ "prompt": prompt }));
172
173 if let Ok(mut reader) = self.confirmation_rx.lock() {
174 let mut line = String::new();
175 if reader.read_line(&mut line).is_ok() {
176 if let Ok(msg) = serde_json::from_str::<Value>(line.trim()) {
177 if msg.get("method").and_then(|m| m.as_str()) == Some("confirm") {
178 return msg
179 .get("params")
180 .and_then(|p| p.get("approved"))
181 .and_then(|a| a.as_bool())
182 .unwrap_or(false);
183 }
184 }
185 }
186 }
187 false
188 }
189
190 fn on_clarification_request(
191 &mut self,
192 request: &ClarificationRequest,
193 ) -> ClarificationResponse {
194 self.emit(
195 "clarification_request",
196 json!({
197 "reason": request.reason,
198 "message": request.message,
199 "suggestions": request.suggestions,
200 }),
201 );
202
203 if let Ok(mut reader) = self.confirmation_rx.lock() {
204 let mut line = String::new();
205 if reader.read_line(&mut line).is_ok() {
206 if let Ok(msg) = serde_json::from_str::<Value>(line.trim()) {
207 if msg.get("method").and_then(|m| m.as_str()) == Some("clarify") {
208 let params = msg.get("params").cloned().unwrap_or(json!({}));
209 let action = params
210 .get("action")
211 .and_then(|a| a.as_str())
212 .unwrap_or("stop");
213 if action == "continue" {
214 let hint = params
215 .get("hint")
216 .and_then(|h| h.as_str())
217 .filter(|s| !s.is_empty())
218 .map(|s| s.to_string());
219 return ClarificationResponse::Continue(hint);
220 }
221 }
222 }
223 }
224 }
225 ClarificationResponse::Stop
226 }
227
228 fn on_task_plan(&mut self, tasks: &[Task]) {
229 self.emit("task_plan", json!({ "tasks": tasks }));
230 }
231
232 fn on_task_progress(&mut self, task_id: u32, completed: bool, tasks: &[Task]) {
233 self.emit(
234 "task_progress",
235 json!({ "task_id": task_id, "completed": completed, "tasks": tasks }),
236 );
237 }
238}
239
240pub fn serve_agent_rpc() -> Result<()> {
247 skilllite_core::config::ensure_default_output_dir();
248
249 let stdin = io::stdin();
250 let stdout = io::stdout();
251 let writer = Arc::new(Mutex::new(stdout));
252 let reader_arc = Arc::new(Mutex::new(BufReader::new(stdin)));
253
254 let rt = tokio::runtime::Runtime::new().context("Failed to create tokio runtime")?;
255
256 loop {
257 let mut line = String::new();
258 {
259 let mut reader = reader_arc
260 .lock()
261 .map_err(|e| anyhow::anyhow!("stdin lock poisoned: {}", e))?;
262 match reader.read_line(&mut line) {
263 Ok(0) => break,
264 Ok(_) => {}
265 Err(e) => {
266 emit_event(
267 &writer,
268 "error",
269 json!({ "message": format!("stdin read error: {}", e) }),
270 );
271 break;
272 }
273 }
274 }
275
276 let line = line.trim();
277 if line.is_empty() {
278 continue;
279 }
280
281 let request: Value = match serde_json::from_str(line) {
282 Ok(v) => v,
283 Err(e) => {
284 emit_event(
285 &writer,
286 "error",
287 json!({ "message": format!("JSON parse error: {}", e) }),
288 );
289 continue;
290 }
291 };
292
293 let method = request.get("method").and_then(|m| m.as_str()).unwrap_or("");
294 let params = request.get("params").cloned().unwrap_or(json!({}));
295
296 match method {
297 "agent_chat" => {
298 let writer_clone = Arc::clone(&writer);
299 let reader_clone = Arc::clone(&reader_arc);
300 if let Err(e) = rt.block_on(handle_agent_chat(¶ms, writer_clone, reader_clone))
301 {
302 emit_event(&writer, "error", json!({ "message": e.to_string() }));
303 }
304 }
305 "ping" => {
306 emit_event(&writer, "pong", json!({}));
307 }
308 "confirm" | "clarify" => {
309 }
312 _ => {
313 emit_event(
314 &writer,
315 "error",
316 json!({ "message": format!("Unknown method: {}", method) }),
317 );
318 }
319 }
320 }
321
322 Ok(())
323}
324
325fn emit_event(writer: &Arc<Mutex<io::Stdout>>, event: &str, data: Value) {
326 let msg = json!({ "event": event, "data": data });
327 if let Ok(mut w) = writer.lock() {
328 let _ = writeln!(w, "{}", msg);
329 let _ = w.flush();
330 }
331}
332
333async fn handle_agent_chat(
334 params: &Value,
335 writer: Arc<Mutex<io::Stdout>>,
336 reader: Arc<Mutex<BufReader<io::Stdin>>>,
337) -> Result<()> {
338 let message = params
339 .get("message")
340 .and_then(|m| m.as_str())
341 .context("'message' is required in agent_chat params")?;
342 let session_key = params
343 .get("session_key")
344 .and_then(|s| s.as_str())
345 .unwrap_or("default");
346
347 let mut config = AgentConfig::from_env();
348 if let Some(overrides) = params.get("config") {
349 if let Some(model) = overrides.get("model").and_then(|v| v.as_str()) {
350 config.model = model.to_string();
351 }
352 if let Some(base) = overrides.get("api_base").and_then(|v| v.as_str()) {
353 config.api_base = base.to_string();
354 }
355 if let Some(key) = overrides.get("api_key").and_then(|v| v.as_str()) {
356 config.api_key = key.to_string();
357 }
358 if let Some(ws) = overrides.get("workspace").and_then(|v| v.as_str()) {
359 config.workspace = ws.to_string();
360 }
361 if let Some(max) = overrides.get("max_iterations").and_then(|v| v.as_u64()) {
362 config.max_iterations = max as usize;
363 }
364 if let Some(plan) = overrides
365 .get("enable_task_planning")
366 .and_then(|v| v.as_bool())
367 {
368 config.enable_task_planning = plan;
369 }
370 if let Some(sp) = overrides.get("soul_path").and_then(|v| v.as_str()) {
371 config.soul_path = Some(sp.to_string());
372 }
373 if let Some(skip) = overrides
374 .get("skip_history_for_planning")
375 .and_then(|v| v.as_bool())
376 {
377 config.skip_history_for_planning = skip;
378 }
379 }
380 if let Some(ctx) = params
382 .get("context")
383 .and_then(|c| c.get("append"))
384 .and_then(|a| a.as_str())
385 {
386 config.context_append = Some(ctx.to_string());
387 }
388
389 if config.api_key.is_empty() {
390 anyhow::bail!("API key required. Set OPENAI_API_KEY env var.");
391 }
392
393 let skill_dirs: Vec<String> =
394 if let Some(dirs) = params.get("skill_dirs").and_then(|v| v.as_array()) {
395 dirs.iter()
396 .filter_map(|d| d.as_str().map(|s| s.to_string()))
397 .collect()
398 } else {
399 skilllite_core::skill::discovery::discover_skill_dirs_for_loading(
400 Path::new(&config.workspace),
401 Some(&[".skills", "skills"]),
402 )
403 };
404
405 let loaded_skills = skills::load_skills(&skill_dirs);
406
407 let mut session = ChatSession::new(config, session_key, loaded_skills);
408 let mut sink = RpcEventSink::new(writer.clone(), reader);
409
410 match session.run_turn(message, &mut sink).await {
411 Ok(agent_result) => {
412 let task_id = Uuid::new_v4().to_string();
413 let node_result = agent_result.to_node_result(&task_id);
414 let data = serde_json::to_value(&node_result).unwrap_or_else(|_| {
415 serde_json::json!({
416 "task_id": task_id,
417 "response": agent_result.response,
418 "task_completed": agent_result.feedback.task_completed,
419 "tool_calls": agent_result.feedback.total_tools,
420 "new_skill": serde_json::Value::Null
421 })
422 });
423 emit_event(&writer, "done", data);
424 }
425 Err(e) => {
426 emit_event(&writer, "error", json!({ "message": e.to_string() }));
427 }
428 }
429
430 Ok(())
431}