Skip to main content

skilllite_executor/
rpc.rs

1//! JSON-RPC handlers for executor feature (session, transcript, memory, plan).
2
3use anyhow::{Context, Result};
4use serde_json::{json, Value};
5use std::fs;
6use std::io::Write;
7
8use super::chat_root_for_rpc;
9use super::memory::{ensure_index, index_file, index_path, search_bm25};
10use super::plan::{append_plan, read_latest_plan};
11use super::session::SessionStore;
12use super::transcript::{
13    append_entry, ensure_session_header, read_entries_for_session, transcript_path_today,
14    TranscriptEntry,
15};
16
17pub fn handle_session_create(params: &Value) -> Result<Value> {
18    let p = params.as_object().context("params must be object")?;
19    let session_key = p
20        .get("session_key")
21        .and_then(|v| v.as_str())
22        .context("session_key required")?;
23    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
24
25    let root = chat_root_for_rpc(workspace_path)?;
26    let sessions_path = root.join("sessions.json");
27
28    let mut store = SessionStore::load(&sessions_path)?;
29    let entry = store.create_or_get(session_key);
30    let session_id = entry.session_id.clone();
31    let session_key_out = entry.session_key.clone();
32    let updated_at = entry.updated_at.clone();
33    store.save(&sessions_path)?;
34
35    Ok(json!({
36        "session_id": session_id,
37        "session_key": session_key_out,
38        "updated_at": updated_at,
39    }))
40}
41
42pub fn handle_session_get(params: &Value) -> Result<Value> {
43    let p = params.as_object().context("params must be object")?;
44    let session_key = p
45        .get("session_key")
46        .and_then(|v| v.as_str())
47        .context("session_key required")?;
48    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
49
50    let root = chat_root_for_rpc(workspace_path)?;
51    let store = SessionStore::load(&root.join("sessions.json"))?;
52
53    let entry = store.get(session_key).context("Session not found")?;
54    Ok(json!({
55        "session_id": entry.session_id,
56        "session_key": entry.session_key,
57        "updated_at": entry.updated_at,
58        "input_tokens": entry.input_tokens,
59        "output_tokens": entry.output_tokens,
60        "total_tokens": entry.total_tokens,
61        "context_tokens": entry.context_tokens,
62        "compaction_count": entry.compaction_count,
63    }))
64}
65
66pub fn handle_session_update(params: &Value) -> Result<Value> {
67    let p = params.as_object().context("params must be object")?;
68    let session_key = p
69        .get("session_key")
70        .and_then(|v| v.as_str())
71        .context("session_key required")?;
72    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
73
74    let root = chat_root_for_rpc(workspace_path)?;
75    let sessions_path = root.join("sessions.json");
76    let mut store = SessionStore::load(&sessions_path)?;
77
78    store.update(session_key, |e| {
79        if let Some(v) = p.get("input_tokens").and_then(|v| v.as_u64()) {
80            e.input_tokens = v;
81        }
82        if let Some(v) = p.get("output_tokens").and_then(|v| v.as_u64()) {
83            e.output_tokens = v;
84        }
85        if let Some(v) = p.get("total_tokens").and_then(|v| v.as_u64()) {
86            e.total_tokens = v;
87        }
88        if let Some(v) = p.get("context_tokens").and_then(|v| v.as_u64()) {
89            e.context_tokens = v;
90        }
91        if let Some(v) = p.get("compaction_count").and_then(|v| v.as_u64()) {
92            e.compaction_count = v as u32;
93        }
94    })?;
95    store.save(&sessions_path)?;
96
97    Ok(json!({"ok": true}))
98}
99
100pub fn handle_transcript_append(params: &Value) -> Result<Value> {
101    let p = params.as_object().context("params must be object")?;
102    let session_key = p
103        .get("session_key")
104        .and_then(|v| v.as_str())
105        .context("session_key required")?;
106    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
107    let entry_json = p.get("entry").context("entry required")?;
108
109    let root = chat_root_for_rpc(workspace_path)?;
110    let transcripts_dir = root.join("transcripts");
111    let transcript_path = transcript_path_today(&transcripts_dir, session_key);
112
113    // Accept flexible entry format - try structured first, else append raw line
114    let entry: TranscriptEntry = match serde_json::from_value(entry_json.clone()) {
115        Ok(e) => e,
116        Err(_) => {
117            // Raw append: write the entry as a single JSON line
118            if let Some(parent) = transcript_path.parent() {
119                fs::create_dir_all(parent)?;
120            }
121            let mut file = fs::OpenOptions::new()
122                .create(true)
123                .append(true)
124                .open(&transcript_path)?;
125            use std::io::Write;
126            writeln!(file, "{}", entry_json)?;
127            let _ = file.sync_all();
128            return Ok(json!({"ok": true}));
129        }
130    };
131    append_entry(&transcript_path, &entry)?;
132
133    Ok(json!({
134        "ok": true,
135        "entry_id": entry.entry_id(),
136    }))
137}
138
139pub fn handle_transcript_read(params: &Value) -> Result<Value> {
140    let p = params.as_object().context("params must be object")?;
141    let session_key = p
142        .get("session_key")
143        .and_then(|v| v.as_str())
144        .context("session_key required")?;
145    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
146
147    let root = chat_root_for_rpc(workspace_path)?;
148    let transcripts_dir = root.join("transcripts");
149
150    let entries = read_entries_for_session(&transcripts_dir, session_key)?;
151    let arr: Vec<Value> = entries
152        .into_iter()
153        .map(serde_json::to_value)
154        .filter_map(Result::ok)
155        .collect();
156    Ok(json!(arr))
157}
158
159/// Resolve plans directory. Use chat root for consistency with ChatSession/chat_data.
160fn plans_dir_for_workspace(workspace_path: Option<&str>) -> Result<std::path::PathBuf> {
161    let root = chat_root_for_rpc(workspace_path)?;
162    Ok(root.join("plans"))
163}
164
165/// Write plan to plans/{session_key}-{date}.jsonl (append). OpenClaw-style plan storage.
166/// Each plan is appended as a new line, preserving history (no overwrite).
167pub fn handle_plan_write(params: &Value) -> Result<Value> {
168    let p = params.as_object().context("params must be object")?;
169    let session_key = p
170        .get("session_key")
171        .and_then(|v| v.as_str())
172        .context("session_key required")?;
173    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
174    let task_id = p.get("task_id").and_then(|v| v.as_str()).unwrap_or("");
175    let task = p.get("task").and_then(|v| v.as_str()).unwrap_or("");
176    let task_list = p
177        .get("steps")
178        .or(p.get("task_list"))
179        .context("steps or task_list required")?;
180    let tasks = task_list.as_array().context("steps must be array")?;
181
182    let plans_dir = plans_dir_for_workspace(workspace_path)?;
183
184    let (steps, current_step_id) = task_list_to_plan_steps(tasks)?;
185    let updated_at = chrono::Utc::now().to_rfc3339();
186    let plan_json = json!({
187        "session_key": session_key,
188        "task_id": task_id,
189        "task": task,
190        "steps": steps,
191        "current_step_id": current_step_id,
192        "updated_at": updated_at,
193    });
194
195    append_plan(&plans_dir, session_key, &plan_json)?;
196
197    let text = plan_textify_inner(tasks)?;
198    Ok(json!({"ok": true, "text": text}))
199}
200
201/// Read latest plan from plans/{session_key}-{date}.jsonl (or legacy .json).
202pub fn handle_plan_read(params: &Value) -> Result<Value> {
203    let p = params.as_object().context("params must be object")?;
204    let session_key = p
205        .get("session_key")
206        .and_then(|v| v.as_str())
207        .context("session_key required")?;
208    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
209    let date = p.get("date").and_then(|v| v.as_str());
210
211    let plans_dir = plans_dir_for_workspace(workspace_path)?;
212
213    let plan = read_latest_plan(&plans_dir, session_key, date)?;
214    Ok(plan.unwrap_or(serde_json::Value::Null))
215}
216
217/// Convert task_list to plan steps with status: completed | running | pending
218fn task_list_to_plan_steps(tasks: &[Value]) -> Result<(Vec<Value>, i64)> {
219    let mut steps = Vec::with_capacity(tasks.len());
220    let mut current_step_id: i64 = 0;
221    let mut found_running = false;
222    for (i, task) in tasks.iter().enumerate() {
223        let completed = task
224            .get("completed")
225            .and_then(|v| v.as_bool())
226            .unwrap_or(false);
227        let status = if completed {
228            "completed"
229        } else if !found_running {
230            found_running = true;
231            current_step_id = task
232                .get("id")
233                .and_then(|v| v.as_i64())
234                .unwrap_or((i + 1) as i64);
235            "running"
236        } else {
237            "pending"
238        };
239        let step = json!({
240            "id": task.get("id").unwrap_or(&json!(i + 1)),
241            "description": task.get("description").unwrap_or(&json!("")),
242            "tool_hint": task.get("tool_hint").unwrap_or(&json!(null)),
243            "status": status,
244            "result": task.get("result").unwrap_or(&json!(null)),
245        });
246        steps.push(step);
247    }
248    if current_step_id == 0 && !tasks.is_empty() {
249        current_step_id = tasks
250            .last()
251            .and_then(|t| t.get("id").and_then(|v| v.as_i64()))
252            .unwrap_or(1);
253    }
254    Ok((steps, current_step_id))
255}
256
257fn plan_textify_inner(tasks: &[Value]) -> Result<String> {
258    let mut lines = Vec::with_capacity(tasks.len());
259    for (i, task) in tasks.iter().enumerate() {
260        let desc = task
261            .get("description")
262            .and_then(|v| v.as_str())
263            .unwrap_or("(no description)");
264        let tool_hint = task
265            .get("tool_hint")
266            .and_then(|v| v.as_str())
267            .filter(|s| !s.is_empty());
268        let completed = task
269            .get("completed")
270            .and_then(|v| v.as_bool())
271            .unwrap_or(false);
272        let status = if completed { "✓" } else { "○" };
273        let tool_part = tool_hint.map(|t| format!(" [{}]", t)).unwrap_or_default();
274        lines.push(format!("{}. {} {}{}", i + 1, status, desc, tool_part));
275    }
276    Ok(lines.join("\n"))
277}
278
279pub fn handle_transcript_ensure(params: &Value) -> Result<Value> {
280    let p = params.as_object().context("params must be object")?;
281    let session_key = p
282        .get("session_key")
283        .and_then(|v| v.as_str())
284        .context("session_key required")?;
285    let session_id = p
286        .get("session_id")
287        .and_then(|v| v.as_str())
288        .context("session_id required")?;
289    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
290    let cwd = p.get("cwd").and_then(|v| v.as_str());
291
292    let root = chat_root_for_rpc(workspace_path)?;
293    let transcripts_dir = root.join("transcripts");
294    let transcript_path = transcript_path_today(&transcripts_dir, session_key);
295
296    ensure_session_header(&transcript_path, session_id, cwd)?;
297    Ok(json!({"ok": true}))
298}
299
300pub fn handle_memory_write(params: &Value) -> Result<Value> {
301    let p = params.as_object().context("params must be object")?;
302    let rel_path = p
303        .get("rel_path")
304        .and_then(|v| v.as_str())
305        .context("rel_path required")?;
306    let content = p
307        .get("content")
308        .and_then(|v| v.as_str())
309        .context("content required")?;
310    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
311    let append = p.get("append").and_then(|v| v.as_bool()).unwrap_or(false);
312    let agent_id = p
313        .get("agent_id")
314        .and_then(|v| v.as_str())
315        .unwrap_or("default");
316
317    let root = chat_root_for_rpc(workspace_path)?;
318    let full_path = root.join("memory").join(rel_path);
319
320    if rel_path.is_empty() || rel_path.contains("..") || rel_path.starts_with('/') {
321        anyhow::bail!("Invalid rel_path: must be relative, without ..");
322    }
323
324    if let Some(parent) = full_path.parent() {
325        fs::create_dir_all(parent)?;
326    }
327
328    if append {
329        let mut file = fs::OpenOptions::new()
330            .create(true)
331            .append(true)
332            .open(&full_path)?;
333        file.write_all(content.as_bytes())?;
334    } else {
335        fs::write(&full_path, content)?;
336    }
337
338    // Index into FTS5
339    let index_path = index_path(&root, agent_id);
340    if let Some(parent) = index_path.parent() {
341        fs::create_dir_all(parent)?;
342    }
343    let conn = rusqlite::Connection::open(&index_path)?;
344    ensure_index(&conn)?;
345    let file_content = fs::read_to_string(&full_path).unwrap_or_default();
346    index_file(&conn, rel_path, &file_content)?;
347
348    Ok(json!({"ok": true, "path": rel_path}))
349}
350
351pub fn handle_memory_search(params: &Value) -> Result<Value> {
352    let p = params.as_object().context("params must be object")?;
353    let query = p
354        .get("query")
355        .and_then(|v| v.as_str())
356        .context("query required")?;
357    let limit = p.get("limit").and_then(|v| v.as_u64()).unwrap_or(10) as i64;
358    let workspace_path = p.get("workspace_path").and_then(|v| v.as_str());
359    let agent_id = p
360        .get("agent_id")
361        .and_then(|v| v.as_str())
362        .unwrap_or("default");
363
364    let root = chat_root_for_rpc(workspace_path)?;
365    let idx_path = index_path(&root, agent_id);
366
367    if !idx_path.exists() {
368        return Ok(json!([]));
369    }
370
371    let conn = rusqlite::Connection::open(&idx_path)?;
372    let hits = search_bm25(&conn, query, limit)?;
373
374    let arr: Vec<Value> = hits
375        .iter()
376        .map(|h| {
377            json!({
378                "path": h.path,
379                "chunk_index": h.chunk_index,
380                "content": h.content,
381                "score": h.score,
382            })
383        })
384        .collect();
385    Ok(json!(arr))
386}
387
388pub fn handle_token_count(params: &Value) -> Result<Value> {
389    let p = params.as_object().context("params must be object")?;
390    let text = p
391        .get("text")
392        .and_then(|v| v.as_str())
393        .context("text required")?;
394
395    // Approximate: ~4 chars per token
396    let count = (text.len() as f64 / 4.0).ceil() as u64;
397    Ok(json!({"tokens": count}))
398}
399
400/// Convert plan (task list) JSON to human-readable text.
401/// Plan format: [{"id": 1, "description": "...", "tool_hint": "...", "completed": false}, ...]
402pub fn handle_plan_textify(params: &Value) -> Result<Value> {
403    let p = params.as_object().context("params must be object")?;
404    let plan = p.get("plan").context("plan required")?;
405    let tasks = plan.as_array().context("plan must be array")?;
406    let text = plan_textify_inner(tasks)?;
407    Ok(json!({"text": text}))
408}