Skip to main content

skilllite_agent/
rpc.rs

1//! Agent Chat RPC: JSON-Lines event stream protocol over stdio.
2//!
3//! **Entry**: `skilllite agent-rpc`
4//!
5//! **Scope**: Agent chat streaming only. One request → many event lines (text_chunk, tool_call,
6//! done, etc.). Supports confirmation round-trips. Uses tokio for async execution.
7//!
8//! **Not this module**: For skill execution (run/exec/bash) and executor RPC (JSON-RPC 2.0,
9//! one request → one response), see [`crate::stdio_rpc`]. That uses `skilllite serve --stdio`.
10//!
11//! This module belongs to the agent layer (Layer 3). It provides a transport-agnostic RPC
12//! interface for Python/TypeScript SDKs to call the Rust agent engine.
13//!
14//! Protocol:
15//!
16//! Request (one JSON line on stdin):
17//! ```json
18//! {"method": "agent_chat", "params": {
19//!     "message": "user input",
20//!     "session_key": "default",
21//!     "context": { "append": "optional string to append to system prompt" },
22//!     "config": { "model": "gpt-4o", ... }  // optional overrides
23//! }}
24//! ```
25//!
26//! Response (multiple JSON lines on stdout):
27//! ```json
28//! {"event": "text_chunk", "data": {"text": "Hello"}}
29//! {"event": "text", "data": {"text": "Hello, how can I help?"}}
30//! {"event": "tool_call", "data": {"name": "read_file", "arguments": "{...}"}}
31//! {"event": "tool_result", "data": {"name": "read_file", "result": "...", "is_error": false}}
32//! {"event": "command_started", "data": {"command": "echo hello"}}
33//! {"event": "command_output", "data": {"stream": "stdout", "chunk": "line"}}
34//! {"event": "command_finished", "data": {"success": true, "exit_code": 0, "duration_ms": 123}}
35//! {"event": "preview_started", "data": {"path": "dist", "port": 8765}}
36//! {"event": "preview_ready", "data": {"url": "http://127.0.0.1:8765", "port": 8765}}
37//! {"event": "preview_failed", "data": {"message": "port already in use"}}
38//! {"event": "preview_stopped", "data": {"reason": "manual stop"}}
39//! {"event": "swarm_started", "data": {"description": "delegate task"}}
40//! {"event": "swarm_progress", "data": {"status": "submitting task"}}
41//! {"event": "swarm_finished", "data": {"summary": "remote node completed task"}}
42//! {"event": "swarm_failed", "data": {"message": "timeout, fallback to local execution"}}
43//! {"event": "task_plan", "data": {"tasks": [...]}}
44//! {"event": "task_progress", "data": {"task_id": 1, "completed": true}}
45//! {"event": "confirmation_request", "data": {"prompt": "Execute rm -rf?"}}
46//! {"event": "clarification_request", "data": {"reason": "no_progress", "message": "...", "suggestions": ["...", "..."]}}
47//! {"event": "done", "data": {"task_id": "...", "response": "...", "task_completed": true, "tool_calls": 3, "new_skill": null}}
48//! {"event": "error", "data": {"message": "..."}}
49//! ```
50//!
51//! For confirmation_request, the caller sends back:
52//! ```json
53//! {"method": "confirm", "params": {"approved": true}}
54//! ```
55//!
56//! For clarification_request, the caller sends back:
57//! ```json
58//! {"method": "clarify", "params": {"action": "continue", "hint": "optional user input"}}
59//! ```
60//! or `{"method": "clarify", "params": {"action": "stop"}}`
61
62use anyhow::{Context, Result};
63use serde_json::{json, Value};
64use std::io::{self, BufRead, BufReader, Write};
65use std::path::Path;
66use std::sync::{Arc, Mutex};
67use uuid::Uuid;
68
69use super::types::*;
70use super::{chat_session::ChatSession, skills};
71
72// ─── RPC Event Sink ─────────────────────────────────────────────────────────
73
74/// EventSink that writes JSON-Lines events to stdout.
75/// Used by the agent_chat RPC protocol.
76struct RpcEventSink {
77    /// Shared writer for thread safety
78    writer: Arc<Mutex<io::Stdout>>,
79    /// Shared reader handle for confirmation prompts
80    confirmation_rx: Arc<Mutex<BufReader<io::Stdin>>>,
81}
82
83impl RpcEventSink {
84    fn new(writer: Arc<Mutex<io::Stdout>>, reader: Arc<Mutex<BufReader<io::Stdin>>>) -> Self {
85        Self {
86            writer,
87            confirmation_rx: reader,
88        }
89    }
90
91    fn emit(&self, event: &str, data: Value) {
92        let msg = json!({ "event": event, "data": data });
93        if let Ok(mut w) = self.writer.lock() {
94            let _ = writeln!(w, "{}", msg);
95            let _ = w.flush();
96        }
97    }
98}
99
100impl EventSink for RpcEventSink {
101    fn on_text(&mut self, text: &str) {
102        self.emit("text", json!({ "text": text }));
103    }
104
105    fn on_text_chunk(&mut self, chunk: &str) {
106        self.emit("text_chunk", json!({ "text": chunk }));
107    }
108
109    fn on_tool_call(&mut self, name: &str, arguments: &str) {
110        self.emit("tool_call", json!({ "name": name, "arguments": arguments }));
111    }
112
113    fn on_tool_result(&mut self, name: &str, result: &str, is_error: bool) {
114        self.emit(
115            "tool_result",
116            json!({ "name": name, "result": result, "is_error": is_error }),
117        );
118    }
119
120    fn on_command_started(&mut self, command: &str) {
121        self.emit("command_started", json!({ "command": command }));
122    }
123
124    fn on_command_output(&mut self, stream: &str, chunk: &str) {
125        self.emit(
126            "command_output",
127            json!({ "stream": stream, "chunk": chunk }),
128        );
129    }
130
131    fn on_command_finished(&mut self, success: bool, exit_code: i32, duration_ms: u64) {
132        self.emit(
133            "command_finished",
134            json!({ "success": success, "exit_code": exit_code, "duration_ms": duration_ms }),
135        );
136    }
137
138    fn on_preview_started(&mut self, path: &str, port: u16) {
139        self.emit("preview_started", json!({ "path": path, "port": port }));
140    }
141
142    fn on_preview_ready(&mut self, url: &str, port: u16) {
143        self.emit("preview_ready", json!({ "url": url, "port": port }));
144    }
145
146    fn on_preview_failed(&mut self, message: &str) {
147        self.emit("preview_failed", json!({ "message": message }));
148    }
149
150    fn on_preview_stopped(&mut self, reason: &str) {
151        self.emit("preview_stopped", json!({ "reason": reason }));
152    }
153
154    fn on_swarm_started(&mut self, description: &str) {
155        self.emit("swarm_started", json!({ "description": description }));
156    }
157
158    fn on_swarm_progress(&mut self, status: &str) {
159        self.emit("swarm_progress", json!({ "status": status }));
160    }
161
162    fn on_swarm_finished(&mut self, summary: &str) {
163        self.emit("swarm_finished", json!({ "summary": summary }));
164    }
165
166    fn on_swarm_failed(&mut self, message: &str) {
167        self.emit("swarm_failed", json!({ "message": message }));
168    }
169
170    fn on_confirmation_request(&mut self, prompt: &str) -> bool {
171        self.emit("confirmation_request", json!({ "prompt": prompt }));
172
173        if let Ok(mut reader) = self.confirmation_rx.lock() {
174            let mut line = String::new();
175            if reader.read_line(&mut line).is_ok() {
176                if let Ok(msg) = serde_json::from_str::<Value>(line.trim()) {
177                    if msg.get("method").and_then(|m| m.as_str()) == Some("confirm") {
178                        return msg
179                            .get("params")
180                            .and_then(|p| p.get("approved"))
181                            .and_then(|a| a.as_bool())
182                            .unwrap_or(false);
183                    }
184                }
185            }
186        }
187        false
188    }
189
190    fn on_clarification_request(
191        &mut self,
192        request: &ClarificationRequest,
193    ) -> ClarificationResponse {
194        self.emit(
195            "clarification_request",
196            json!({
197                "reason": request.reason,
198                "message": request.message,
199                "suggestions": request.suggestions,
200            }),
201        );
202
203        if let Ok(mut reader) = self.confirmation_rx.lock() {
204            let mut line = String::new();
205            if reader.read_line(&mut line).is_ok() {
206                if let Ok(msg) = serde_json::from_str::<Value>(line.trim()) {
207                    if msg.get("method").and_then(|m| m.as_str()) == Some("clarify") {
208                        let params = msg.get("params").cloned().unwrap_or(json!({}));
209                        let action = params
210                            .get("action")
211                            .and_then(|a| a.as_str())
212                            .unwrap_or("stop");
213                        if action == "continue" {
214                            let hint = params
215                                .get("hint")
216                                .and_then(|h| h.as_str())
217                                .filter(|s| !s.is_empty())
218                                .map(|s| s.to_string());
219                            return ClarificationResponse::Continue(hint);
220                        }
221                    }
222                }
223            }
224        }
225        ClarificationResponse::Stop
226    }
227
228    fn on_task_plan(&mut self, tasks: &[Task]) {
229        self.emit("task_plan", json!({ "tasks": tasks }));
230    }
231
232    fn on_task_progress(&mut self, task_id: u32, completed: bool, tasks: &[Task]) {
233        self.emit(
234            "task_progress",
235            json!({ "task_id": task_id, "completed": completed, "tasks": tasks }),
236        );
237    }
238}
239
240// ─── RPC Server ─────────────────────────────────────────────────────────────
241
242/// Run the agent_chat RPC server over stdio.
243///
244/// Reads JSON-Lines from stdin, processes agent_chat requests,
245/// streams events as JSON-Lines to stdout.
246pub fn serve_agent_rpc() -> Result<()> {
247    skilllite_core::config::ensure_default_output_dir();
248
249    let stdin = io::stdin();
250    let stdout = io::stdout();
251    let writer = Arc::new(Mutex::new(stdout));
252    let reader_arc = Arc::new(Mutex::new(BufReader::new(stdin)));
253
254    let rt = tokio::runtime::Runtime::new().context("Failed to create tokio runtime")?;
255
256    loop {
257        let mut line = String::new();
258        {
259            let mut reader = reader_arc
260                .lock()
261                .map_err(|e| anyhow::anyhow!("stdin lock poisoned: {}", e))?;
262            match reader.read_line(&mut line) {
263                Ok(0) => break,
264                Ok(_) => {}
265                Err(e) => {
266                    emit_event(
267                        &writer,
268                        "error",
269                        json!({ "message": format!("stdin read error: {}", e) }),
270                    );
271                    break;
272                }
273            }
274        }
275
276        let line = line.trim();
277        if line.is_empty() {
278            continue;
279        }
280
281        let request: Value = match serde_json::from_str(line) {
282            Ok(v) => v,
283            Err(e) => {
284                emit_event(
285                    &writer,
286                    "error",
287                    json!({ "message": format!("JSON parse error: {}", e) }),
288                );
289                continue;
290            }
291        };
292
293        let method = request.get("method").and_then(|m| m.as_str()).unwrap_or("");
294        let params = request.get("params").cloned().unwrap_or(json!({}));
295
296        match method {
297            "agent_chat" => {
298                let writer_clone = Arc::clone(&writer);
299                let reader_clone = Arc::clone(&reader_arc);
300                if let Err(e) = rt.block_on(handle_agent_chat(&params, writer_clone, reader_clone))
301                {
302                    emit_event(&writer, "error", json!({ "message": e.to_string() }));
303                }
304            }
305            "ping" => {
306                emit_event(&writer, "pong", json!({}));
307            }
308            "confirm" | "clarify" => {
309                // 进程管理端在 confirmation_request / clarification_request 后发送响应;
310                // 若 agent_chat 已结束,主循环会读到滞后的消息。静默忽略。
311            }
312            _ => {
313                emit_event(
314                    &writer,
315                    "error",
316                    json!({ "message": format!("Unknown method: {}", method) }),
317                );
318            }
319        }
320    }
321
322    Ok(())
323}
324
325fn emit_event(writer: &Arc<Mutex<io::Stdout>>, event: &str, data: Value) {
326    let msg = json!({ "event": event, "data": data });
327    if let Ok(mut w) = writer.lock() {
328        let _ = writeln!(w, "{}", msg);
329        let _ = w.flush();
330    }
331}
332
333async fn handle_agent_chat(
334    params: &Value,
335    writer: Arc<Mutex<io::Stdout>>,
336    reader: Arc<Mutex<BufReader<io::Stdin>>>,
337) -> Result<()> {
338    let message = params
339        .get("message")
340        .and_then(|m| m.as_str())
341        .context("'message' is required in agent_chat params")?;
342    let session_key = params
343        .get("session_key")
344        .and_then(|s| s.as_str())
345        .unwrap_or("default");
346
347    let mut config = AgentConfig::from_env();
348    if let Some(overrides) = params.get("config") {
349        if let Some(model) = overrides.get("model").and_then(|v| v.as_str()) {
350            config.model = model.to_string();
351        }
352        if let Some(base) = overrides.get("api_base").and_then(|v| v.as_str()) {
353            config.api_base = base.to_string();
354        }
355        if let Some(key) = overrides.get("api_key").and_then(|v| v.as_str()) {
356            config.api_key = key.to_string();
357        }
358        if let Some(ws) = overrides.get("workspace").and_then(|v| v.as_str()) {
359            config.workspace = ws.to_string();
360        }
361        if let Some(max) = overrides.get("max_iterations").and_then(|v| v.as_u64()) {
362            config.max_iterations = max as usize;
363        }
364        if let Some(plan) = overrides
365            .get("enable_task_planning")
366            .and_then(|v| v.as_bool())
367        {
368            config.enable_task_planning = plan;
369        }
370        if let Some(sp) = overrides.get("soul_path").and_then(|v| v.as_str()) {
371            config.soul_path = Some(sp.to_string());
372        }
373        if let Some(skip) = overrides
374            .get("skip_history_for_planning")
375            .and_then(|v| v.as_bool())
376        {
377            config.skip_history_for_planning = skip;
378        }
379    }
380    // params.context.append — was documented but not parsed
381    if let Some(ctx) = params
382        .get("context")
383        .and_then(|c| c.get("append"))
384        .and_then(|a| a.as_str())
385    {
386        config.context_append = Some(ctx.to_string());
387    }
388
389    if config.api_key.is_empty() {
390        anyhow::bail!("API key required. Set OPENAI_API_KEY env var.");
391    }
392
393    let skill_dirs: Vec<String> =
394        if let Some(dirs) = params.get("skill_dirs").and_then(|v| v.as_array()) {
395            dirs.iter()
396                .filter_map(|d| d.as_str().map(|s| s.to_string()))
397                .collect()
398        } else {
399            skilllite_core::skill::discovery::discover_skill_dirs_for_loading(
400                Path::new(&config.workspace),
401                Some(&[".skills", "skills"]),
402            )
403        };
404
405    let loaded_skills = skills::load_skills(&skill_dirs);
406
407    let mut session = ChatSession::new(config, session_key, loaded_skills);
408    let mut sink = RpcEventSink::new(writer.clone(), reader);
409
410    match session.run_turn(message, &mut sink).await {
411        Ok(agent_result) => {
412            let task_id = Uuid::new_v4().to_string();
413            let node_result = agent_result.to_node_result(&task_id);
414            let data = serde_json::to_value(&node_result).unwrap_or_else(|_| {
415                serde_json::json!({
416                    "task_id": task_id,
417                    "response": agent_result.response,
418                    "task_completed": agent_result.feedback.task_completed,
419                    "tool_calls": agent_result.feedback.total_tools,
420                    "new_skill": serde_json::Value::Null
421                })
422            });
423            emit_event(&writer, "done", data);
424        }
425        Err(e) => {
426            emit_event(&writer, "error", json!({ "message": e.to_string() }));
427        }
428    }
429
430    Ok(())
431}