Skip to main content

routa_core/acp/
process.rs

1//! AcpProcess — manages a single ACP agent child process with JSON-RPC over stdio.
2//!
3//! The lifecycle mirrors the Next.js `AcpProcess` class:
4//!   1. `spawn(command, args)` — start the child, launch a background stdout reader
5//!   2. `initialize()`         — send "initialize" request, wait for response
6//!   3. `new_session(cwd)`     — send "session/new", get back sessionId
7//!   4. `prompt(sid, text)`    — send "session/prompt" (5-min timeout), stream via SSE
8//!   5. `kill()`               — terminate the process
9//!
10//! Agent→client requests (permissions, fs, terminal) are handled in the background reader.
11//! Agent message notifications are traced to JSONL files for attribution tracking.
12
13use std::collections::HashMap;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
19use tokio::process::{Child, ChildStdin};
20use tokio::sync::{broadcast, oneshot, Mutex};
21
22use super::terminal_manager::TerminalManager;
23use crate::trace::{
24    Contributor, TraceConversation, TraceEventType, TraceRecord, TraceTool, TraceWriter,
25};
26
27/// Callback type for session/update notifications from the agent.
28pub type NotificationSender = broadcast::Sender<serde_json::Value>;
29
30/// Type alias for the pending request map to avoid complex type repetition.
31type PendingMap = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<serde_json::Value, String>>>>>;
32
33/// A managed ACP agent child process.
34pub struct AcpProcess {
35    stdin: Arc<Mutex<ChildStdin>>,
36    child: Arc<Mutex<Option<Child>>>,
37    pending: PendingMap,
38    next_id: Arc<AtomicU64>,
39    alive: Arc<AtomicBool>,
40    notification_tx: NotificationSender,
41    display_name: String,
42    /// The command used to spawn this process (e.g., "npx", "uvx", "opencode")
43    command: String,
44    _reader_handle: tokio::task::JoinHandle<()>,
45}
46
47impl AcpProcess {
48    /// Spawn the agent process and start the background reader.
49    ///
50    /// `our_session_id` is used to rewrite the agent's session ID in notifications
51    /// so the frontend SSE stream matches on the correct session.
52    pub async fn spawn(
53        command: &str,
54        args: &[&str],
55        cwd: &str,
56        notification_tx: NotificationSender,
57        display_name: &str,
58        our_session_id: &str,
59    ) -> Result<Self, String> {
60        tracing::info!(
61            "[AcpProcess:{}] Spawning: {} {} (cwd: {})",
62            display_name,
63            command,
64            args.join(" "),
65            cwd,
66        );
67
68        // Resolve the actual binary path using the full shell PATH
69        // (macOS GUI apps have a minimal PATH that won't find user CLI tools)
70        let resolved_command =
71            crate::shell_env::which(command).unwrap_or_else(|| command.to_string());
72
73        let mut command_builder = tokio::process::Command::new(&resolved_command);
74        command_builder
75            .args(args)
76            .current_dir(cwd)
77            .env("PATH", crate::shell_env::full_path())
78            .env("NODE_NO_READLINE", "1")
79            .stdin(std::process::Stdio::piped())
80            .stdout(std::process::Stdio::piped())
81            .stderr(std::process::Stdio::piped());
82
83        // codex-acp often returns only stopReason in session/prompt result.
84        // Enabling lightweight codex logs gives us process_output lines that
85        // include assistant deltas, which the CLI can aggregate as final output.
86        if resolved_command.ends_with("codex-acp") && std::env::var_os("RUST_LOG").is_none() {
87            command_builder.env(
88                "RUST_LOG",
89                "info,codex_acp::thread=info,codex_acp::codex_agent=info",
90            );
91        }
92
93        let mut child = command_builder.spawn().map_err(|e| {
94            format!(
95                "Failed to spawn '{}' (resolved: '{}'): {}. Is it installed and in PATH?",
96                command, resolved_command, e
97            )
98        })?;
99
100        let stdin = child
101            .stdin
102            .take()
103            .ok_or_else(|| "No stdin on child process".to_string())?;
104        let stdout = child
105            .stdout
106            .take()
107            .ok_or_else(|| "No stdout on child process".to_string())?;
108        let stderr = child.stderr.take();
109
110        let alive = Arc::new(AtomicBool::new(true));
111        let pending: PendingMap = Arc::new(Mutex::new(HashMap::new()));
112        let stdin = Arc::new(Mutex::new(stdin));
113
114        let name = display_name.to_string();
115
116        // Log stderr in background and forward to frontend as process_output
117        if let Some(stderr) = stderr {
118            let name_clone = name.clone();
119            let ntx_stderr = notification_tx.clone();
120            let our_sid_stderr = our_session_id.to_string();
121            tokio::spawn(async move {
122                let reader = BufReader::new(stderr);
123                let mut lines = reader.lines();
124                while let Ok(Some(line)) = lines.next_line().await {
125                    if !line.trim().is_empty() {
126                        tracing::debug!("[AcpProcess:{} stderr] {}", name_clone, line);
127                        // Forward stderr to frontend as process_output notification
128                        let notification = serde_json::json!({
129                            "jsonrpc": "2.0",
130                            "method": "session/update",
131                            "params": {
132                                "sessionId": our_sid_stderr,
133                                "update": {
134                                    "sessionUpdate": "process_output",
135                                    "source": "stderr",
136                                    "data": format!("{}\n", line),
137                                    "displayName": name_clone,
138                                }
139                            }
140                        });
141                        let _ = ntx_stderr.send(notification);
142                    }
143                }
144            });
145        }
146
147        // Background stdout reader — dispatches responses, notifications, agent requests
148        let alive_clone = alive.clone();
149        let pending_clone = pending.clone();
150        let ntx = notification_tx.clone();
151        let stdin_clone = stdin.clone();
152        let name_clone = name.clone();
153        let our_sid = our_session_id.to_string();
154        let cwd_clone = cwd.to_string();
155        let provider_clone = display_name.to_string();
156
157        let reader_handle = tokio::spawn(async move {
158            let reader = BufReader::new(stdout);
159            let mut lines = reader.lines();
160            // Buffer for accumulating agent message chunks
161            let mut agent_msg_buffer = String::new();
162            // Buffer for accumulating agent thought chunks
163            let mut agent_thought_buffer = String::new();
164            // Buffer for pending tool calls awaiting rawInput (OpenCode sends empty rawInput initially)
165            let mut pending_tool_calls: std::collections::HashMap<String, (String, bool)> =
166                std::collections::HashMap::new();
167
168            while let Ok(Some(line)) = lines.next_line().await {
169                let line = line.trim().to_string();
170                if line.is_empty() {
171                    continue;
172                }
173
174                let msg: serde_json::Value = match serde_json::from_str(&line) {
175                    Ok(v) => v,
176                    Err(_) => {
177                        // Try to find embedded JSON objects
178                        if let Some(v) = try_parse_embedded_json(&line) {
179                            v
180                        } else {
181                            tracing::debug!(
182                                "[AcpProcess:{}] Non-JSON stdout: {}",
183                                name_clone,
184                                &line[..line.len().min(200)]
185                            );
186                            continue;
187                        }
188                    }
189                };
190
191                let has_id = msg.get("id").is_some() && !msg.get("id").unwrap().is_null();
192                let has_result = msg.get("result").is_some();
193                let has_error = msg.get("error").is_some();
194                let has_method = msg.get("method").and_then(|m| m.as_str()).is_some();
195
196                if has_id && (has_result || has_error) {
197                    // Response to a pending request
198                    let id = msg["id"].as_u64().unwrap_or(0);
199                    let mut map = pending_clone.lock().await;
200                    if let Some(tx) = map.remove(&id) {
201                        if has_error {
202                            let err_msg =
203                                msg["error"]["message"].as_str().unwrap_or("unknown error");
204                            let err_code = msg["error"]["code"].as_i64().unwrap_or(0);
205                            let _ = tx.send(Err(format!("ACP Error [{}]: {}", err_code, err_msg)));
206                        } else {
207                            let _ = tx.send(Ok(msg["result"].clone()));
208                        }
209                    }
210                } else if has_id && has_method {
211                    // Agent→Client request — handle it
212                    let method = msg["method"].as_str().unwrap_or("");
213                    let id_val = msg["id"].clone();
214                    tracing::info!(
215                        "[AcpProcess:{}] Agent request: {} (id={})",
216                        name_clone,
217                        method,
218                        id_val
219                    );
220                    let response =
221                        handle_agent_request(method, &msg["params"], &our_sid, &ntx).await;
222                    let reply = serde_json::json!({
223                        "jsonrpc": "2.0",
224                        "id": id_val,
225                        "result": response,
226                    });
227                    let data = format!("{}\n", serde_json::to_string(&reply).unwrap());
228                    let mut stdin = stdin_clone.lock().await;
229                    let _ = stdin.write_all(data.as_bytes()).await;
230                    let _ = stdin.flush().await;
231                } else if has_method {
232                    // Notification (no id) — forward to SSE
233                    // Rewrite the agent's sessionId to our session ID so the
234                    // frontend SSE stream can match on the correct session.
235                    let mut rewritten = msg.clone();
236                    if let Some(params) = rewritten.get_mut("params") {
237                        if params.get("sessionId").is_some() {
238                            params["sessionId"] = serde_json::Value::String(our_sid.clone());
239                        }
240                    }
241
242                    // ── Trace: various event types ─────────────────────────────
243                    if let Some(params) = msg.get("params") {
244                        if let Some(update) = params.get("update") {
245                            let session_update = update
246                                .get("sessionUpdate")
247                                .and_then(|v| v.as_str())
248                                .unwrap_or("");
249
250                            match session_update {
251                                "agent_thought_chunk" => {
252                                    // Accumulate thought chunks
253                                    let text = update
254                                        .get("content")
255                                        .and_then(|c| c.get("text"))
256                                        .and_then(|t| t.as_str())
257                                        .unwrap_or("");
258                                    agent_thought_buffer.push_str(text);
259                                    // Trace when buffer reaches 100+ chars
260                                    if agent_thought_buffer.len() >= 100 {
261                                        let record = TraceRecord::new(
262                                            &our_sid,
263                                            TraceEventType::AgentThought,
264                                            Contributor::new(&provider_clone, None),
265                                        )
266                                        .with_conversation(TraceConversation {
267                                            turn: None,
268                                            role: Some("assistant".to_string()),
269                                            content_preview: Some(
270                                                agent_thought_buffer
271                                                    [..agent_thought_buffer.len().min(200)]
272                                                    .to_string(),
273                                            ),
274                                            full_content: Some(agent_thought_buffer.clone()),
275                                        });
276                                        let writer = TraceWriter::new(&cwd_clone);
277                                        let _ = writer.append_safe(&record).await;
278                                        agent_thought_buffer.clear();
279                                    }
280                                }
281                                "agent_message_chunk" => {
282                                    // Accumulate message chunks
283                                    let text = update
284                                        .get("content")
285                                        .and_then(|c| c.get("text"))
286                                        .and_then(|t| t.as_str())
287                                        .unwrap_or("");
288                                    agent_msg_buffer.push_str(text);
289                                    // Trace when buffer reaches 100+ chars
290                                    if agent_msg_buffer.len() >= 100 {
291                                        let record = TraceRecord::new(
292                                            &our_sid,
293                                            TraceEventType::AgentMessage,
294                                            Contributor::new(&provider_clone, None),
295                                        )
296                                        .with_conversation(TraceConversation {
297                                            turn: None,
298                                            role: Some("assistant".to_string()),
299                                            content_preview: Some(
300                                                agent_msg_buffer[..agent_msg_buffer.len().min(200)]
301                                                    .to_string(),
302                                            ),
303                                            full_content: Some(agent_msg_buffer.clone()),
304                                        });
305                                        let writer = TraceWriter::new(&cwd_clone);
306                                        let _ = writer.append_safe(&record).await;
307                                        agent_msg_buffer.clear();
308                                    }
309                                }
310                                "agent_message" => {
311                                    // Full message - trace immediately
312                                    let text = update
313                                        .get("content")
314                                        .and_then(|c| c.get("text"))
315                                        .and_then(|t| t.as_str())
316                                        .unwrap_or("");
317                                    let record = TraceRecord::new(
318                                        &our_sid,
319                                        TraceEventType::AgentMessage,
320                                        Contributor::new(&provider_clone, None),
321                                    )
322                                    .with_conversation(TraceConversation {
323                                        turn: None,
324                                        role: Some("assistant".to_string()),
325                                        content_preview: Some(
326                                            text[..text.len().min(200)].to_string(),
327                                        ),
328                                        full_content: Some(text.to_string()),
329                                    });
330                                    let writer = TraceWriter::new(&cwd_clone);
331                                    let _ = writer.append_safe(&record).await;
332                                }
333                                "tool_call" => {
334                                    // Tool call - OpenCode may send rawInput as empty initially
335                                    let tool_call_id =
336                                        update.get("toolCallId").and_then(|v| v.as_str());
337                                    let kind = update
338                                        .get("kind")
339                                        .and_then(|v| v.as_str())
340                                        .or_else(|| update.get("title").and_then(|v| v.as_str()))
341                                        .unwrap_or("unknown");
342                                    let raw_input = update.get("rawInput").cloned();
343
344                                    // Check if rawInput is empty or null
345                                    let has_input = raw_input.as_ref().is_some_and(|v| {
346                                        if let Some(obj) = v.as_object() {
347                                            !obj.is_empty()
348                                        } else {
349                                            !v.is_null()
350                                        }
351                                    });
352
353                                    if has_input {
354                                        // Trace immediately with full input (Claude Code behavior)
355                                        let record = TraceRecord::new(
356                                            &our_sid,
357                                            TraceEventType::ToolCall,
358                                            Contributor::new(&provider_clone, None),
359                                        )
360                                        .with_tool(TraceTool {
361                                            name: kind.to_string(),
362                                            tool_call_id: tool_call_id.map(|s| s.to_string()),
363                                            status: Some("running".to_string()),
364                                            input: raw_input,
365                                            output: None,
366                                        });
367                                        let writer = TraceWriter::new(&cwd_clone);
368                                        let _ = writer.append_safe(&record).await;
369                                    } else if let Some(id) = tool_call_id {
370                                        // OpenCode behavior: rawInput is empty, wait for tool_call_update
371                                        pending_tool_calls
372                                            .insert(id.to_string(), (kind.to_string(), false));
373                                    }
374                                }
375                                "tool_call_update" => {
376                                    // Tool update - may contain rawInput (OpenCode) or just rawOutput (completion)
377                                    let tool_call_id =
378                                        update.get("toolCallId").and_then(|v| v.as_str());
379                                    let kind = update
380                                        .get("kind")
381                                        .and_then(|v| v.as_str())
382                                        .or_else(|| update.get("title").and_then(|v| v.as_str()))
383                                        .unwrap_or("unknown");
384                                    let raw_input = update.get("rawInput").cloned();
385                                    let raw_output = update
386                                        .get("rawOutput")
387                                        .and_then(|v| v.as_str())
388                                        .map(|s| serde_json::Value::String(s.to_string()))
389                                        .or_else(|| update.get("rawOutput").cloned());
390                                    let status = update
391                                        .get("status")
392                                        .and_then(|v| v.as_str())
393                                        .unwrap_or("completed");
394
395                                    // Check if this update has rawInput and the tool_call wasn't traced yet
396                                    let has_input = raw_input.as_ref().is_some_and(|v| {
397                                        if let Some(obj) = v.as_object() {
398                                            !obj.is_empty()
399                                        } else {
400                                            !v.is_null()
401                                        }
402                                    });
403
404                                    if let Some(id) = tool_call_id {
405                                        if let Some((stored_kind, traced)) =
406                                            pending_tool_calls.get_mut(id)
407                                        {
408                                            if has_input && !*traced {
409                                                // Record the tool_call trace now with actual input
410                                                let record = TraceRecord::new(
411                                                    &our_sid,
412                                                    TraceEventType::ToolCall,
413                                                    Contributor::new(&provider_clone, None),
414                                                )
415                                                .with_tool(TraceTool {
416                                                    name: stored_kind.clone(),
417                                                    tool_call_id: Some(id.to_string()),
418                                                    status: Some("running".to_string()),
419                                                    input: raw_input.clone(),
420                                                    output: None,
421                                                });
422                                                let writer = TraceWriter::new(&cwd_clone);
423                                                let _ = writer.append_safe(&record).await;
424                                                *traced = true;
425                                            }
426                                        }
427                                    }
428
429                                    // Record tool_result trace when status indicates completion or we have output
430                                    let is_complete = status == "completed"
431                                        || status == "failed"
432                                        || raw_output.is_some();
433                                    if is_complete {
434                                        let record = TraceRecord::new(
435                                            &our_sid,
436                                            TraceEventType::ToolResult,
437                                            Contributor::new(&provider_clone, None),
438                                        )
439                                        .with_tool(TraceTool {
440                                            name: kind.to_string(),
441                                            tool_call_id: tool_call_id.map(|s| s.to_string()),
442                                            status: Some(status.to_string()),
443                                            input: None,
444                                            output: raw_output,
445                                        });
446                                        let writer = TraceWriter::new(&cwd_clone);
447                                        let _ = writer.append_safe(&record).await;
448
449                                        // Clean up pending entry
450                                        if let Some(id) = tool_call_id {
451                                            pending_tool_calls.remove(id);
452                                        }
453                                    }
454                                }
455                                _ => {}
456                            }
457                        }
458                    }
459
460                    let _ = ntx.send(rewritten);
461                } else {
462                    tracing::debug!(
463                        "[AcpProcess:{}] Unhandled message: {}",
464                        name_clone,
465                        &line[..line.len().min(200)]
466                    );
467                }
468            }
469
470            // Flush any remaining buffered agent message content
471            if !agent_msg_buffer.is_empty() {
472                let record = TraceRecord::new(
473                    &our_sid,
474                    TraceEventType::AgentMessage,
475                    Contributor::new(&provider_clone, None),
476                )
477                .with_conversation(TraceConversation {
478                    turn: None,
479                    role: Some("assistant".to_string()),
480                    content_preview: Some(
481                        agent_msg_buffer[..agent_msg_buffer.len().min(200)].to_string(),
482                    ),
483                    full_content: Some(agent_msg_buffer.clone()),
484                });
485                let writer = TraceWriter::new(&cwd_clone);
486                let _ = writer.append_safe(&record).await;
487            }
488
489            // Flush any remaining buffered agent thought content
490            if !agent_thought_buffer.is_empty() {
491                let record = TraceRecord::new(
492                    &our_sid,
493                    TraceEventType::AgentThought,
494                    Contributor::new(&provider_clone, None),
495                )
496                .with_conversation(TraceConversation {
497                    turn: None,
498                    role: Some("assistant".to_string()),
499                    content_preview: Some(
500                        agent_thought_buffer[..agent_thought_buffer.len().min(200)].to_string(),
501                    ),
502                    full_content: Some(agent_thought_buffer.clone()),
503                });
504                let writer = TraceWriter::new(&cwd_clone);
505                let _ = writer.append_safe(&record).await;
506            }
507
508            alive_clone.store(false, Ordering::SeqCst);
509            tracing::info!("[AcpProcess:{}] stdout reader finished", name_clone);
510        });
511
512        // Wait briefly for process to stabilize
513        tokio::time::sleep(Duration::from_millis(300)).await;
514
515        if !alive.load(Ordering::SeqCst) {
516            return Err(format!("{} process died during startup", display_name));
517        }
518
519        tracing::info!("[AcpProcess:{}] Process started", display_name);
520
521        Ok(Self {
522            stdin,
523            child: Arc::new(Mutex::new(Some(child))),
524            pending,
525            next_id: Arc::new(AtomicU64::new(1)),
526            alive,
527            notification_tx,
528            display_name: display_name.to_string(),
529            command: command.to_string(),
530            _reader_handle: reader_handle,
531        })
532    }
533
534    /// Whether the process is still alive.
535    pub fn is_alive(&self) -> bool {
536        self.alive.load(Ordering::SeqCst)
537    }
538
539    /// Send a JSON-RPC request and wait for the response.
540    pub async fn send_request(
541        &self,
542        method: &str,
543        params: serde_json::Value,
544        timeout_ms: Option<u64>,
545    ) -> Result<serde_json::Value, String> {
546        if !self.is_alive() {
547            return Err(format!("{} process is not alive", self.display_name));
548        }
549
550        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
551        let (tx, rx) = oneshot::channel();
552
553        self.pending.lock().await.insert(id, tx);
554
555        let msg = serde_json::json!({
556            "jsonrpc": "2.0",
557            "id": id,
558            "method": method,
559            "params": params,
560        });
561        let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
562
563        {
564            let mut stdin = self.stdin.lock().await;
565            stdin
566                .write_all(data.as_bytes())
567                .await
568                .map_err(|e| format!("Write {}: {}", method, e))?;
569            stdin
570                .flush()
571                .await
572                .map_err(|e| format!("Flush {}: {}", method, e))?;
573        }
574
575        // Determine timeout based on method and command type
576        // npx/uvx agents may need longer timeout for first-time package download
577        let is_npx_or_uvx = self.command == "npx" || self.command == "uvx";
578        let default_timeout = match method {
579            "initialize" | "session/new" => {
580                if is_npx_or_uvx {
581                    120_000 // 2 min for npx/uvx (may need to download packages)
582                } else {
583                    15_000 // 15s for others
584                }
585            }
586            "session/prompt" => 300_000, // 5 min
587            _ => 30_000,
588        };
589        let timeout_dur = Duration::from_millis(timeout_ms.unwrap_or(default_timeout));
590
591        match tokio::time::timeout(timeout_dur, rx).await {
592            Ok(Ok(result)) => result,
593            Ok(Err(_)) => Err(format!("Channel closed for {} (id={})", method, id)),
594            Err(_) => {
595                self.pending.lock().await.remove(&id);
596                Err(format!(
597                    "Timeout waiting for {} (id={}, {}ms)",
598                    method,
599                    id,
600                    timeout_dur.as_millis()
601                ))
602            }
603        }
604    }
605
606    /// Initialize the ACP protocol.
607    pub async fn initialize(&self) -> Result<serde_json::Value, String> {
608        self.initialize_with_timeout(None).await
609    }
610
611    /// Initialize the ACP protocol with an optional timeout override.
612    pub async fn initialize_with_timeout(
613        &self,
614        timeout_ms: Option<u64>,
615    ) -> Result<serde_json::Value, String> {
616        let result = self
617            .send_request(
618                "initialize",
619                serde_json::json!({
620                    "protocolVersion": 1,
621                    "clientInfo": {
622                        "name": "routa-desktop",
623                        "version": "0.1.0"
624                    }
625                }),
626                timeout_ms,
627            )
628            .await?;
629        tracing::info!(
630            "[AcpProcess:{}] Initialized: {}",
631            self.display_name,
632            serde_json::to_string(&result).unwrap_or_default()
633        );
634        Ok(result)
635    }
636
637    /// Create a new ACP session. Returns the agent's session ID.
638    pub async fn new_session(&self, cwd: &str) -> Result<String, String> {
639        let result = self
640            .send_request(
641                "session/new",
642                serde_json::json!({
643                    "cwd": cwd,
644                    "mcpServers": []
645                }),
646                None,
647            )
648            .await?;
649
650        let session_id = result["sessionId"]
651            .as_str()
652            .ok_or_else(|| "No sessionId in session/new response".to_string())?
653            .to_string();
654
655        tracing::info!(
656            "[AcpProcess:{}] Session created: {}",
657            self.display_name,
658            session_id
659        );
660        Ok(session_id)
661    }
662
663    /// Send a prompt to an existing session. 5-minute timeout.
664    pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
665        self.send_request(
666            "session/prompt",
667            serde_json::json!({
668                "sessionId": session_id,
669                "prompt": [{ "type": "text", "text": text }]
670            }),
671            Some(300_000),
672        )
673        .await
674    }
675
676    /// Send session/cancel notification (no response expected).
677    pub async fn cancel(&self, session_id: &str) {
678        let msg = serde_json::json!({
679            "jsonrpc": "2.0",
680            "method": "session/cancel",
681            "params": { "sessionId": session_id }
682        });
683        let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
684        let mut stdin = self.stdin.lock().await;
685        let _ = stdin.write_all(data.as_bytes()).await;
686        let _ = stdin.flush().await;
687    }
688
689    /// Get the notification broadcast sender (for subscribing to SSE).
690    pub fn notification_sender(&self) -> &NotificationSender {
691        &self.notification_tx
692    }
693
694    /// Kill the agent process.
695    pub async fn kill(&self) {
696        self.alive.store(false, Ordering::SeqCst);
697        if let Some(mut child) = self.child.lock().await.take() {
698            tracing::info!("[AcpProcess:{}] Killing process", self.display_name);
699            let _ = child.kill().await;
700        }
701        // Reject all pending requests
702        let mut map = self.pending.lock().await;
703        for (_, tx) in map.drain() {
704            let _ = tx.send(Err("Process killed".to_string()));
705        }
706    }
707}
708
709/// Handle agent→client requests. Auto-approves permissions, handles fs ops.
710async fn handle_agent_request(
711    method: &str,
712    params: &serde_json::Value,
713    session_id: &str,
714    notification_tx: &NotificationSender,
715) -> serde_json::Value {
716    match method {
717        "session/request_permission" => {
718            // Auto-approve all permissions
719            serde_json::json!({
720                "outcome": { "outcome": "approved" }
721            })
722        }
723        "fs/read_text_file" => {
724            let path = params["path"].as_str().unwrap_or("");
725            match tokio::fs::read_to_string(path).await {
726                Ok(content) => serde_json::json!({ "content": content }),
727                Err(e) => serde_json::json!({
728                    "error": format!("Failed to read file: {}", e)
729                }),
730            }
731        }
732        "fs/write_text_file" => {
733            let path = params["path"].as_str().unwrap_or("");
734            let content = params["content"].as_str().unwrap_or("");
735            if let Some(parent) = std::path::Path::new(path).parent() {
736                let _ = tokio::fs::create_dir_all(parent).await;
737            }
738            match tokio::fs::write(path, content).await {
739                Ok(_) => serde_json::json!({}),
740                Err(e) => serde_json::json!({
741                    "error": format!("Failed to write file: {}", e)
742                }),
743            }
744        }
745        "terminal/create" => {
746            match TerminalManager::global()
747                .create(params, session_id, notification_tx)
748                .await
749            {
750                Ok(result) => result,
751                Err(error) => serde_json::json!({ "error": error }),
752            }
753        }
754        "terminal/output" => {
755            let terminal_id = params["terminalId"].as_str().unwrap_or("");
756            match TerminalManager::global().get_output(terminal_id).await {
757                Ok(result) => result,
758                Err(error) => serde_json::json!({ "error": error }),
759            }
760        }
761        "terminal/wait_for_exit" => {
762            let terminal_id = params["terminalId"].as_str().unwrap_or("");
763            match TerminalManager::global().wait_for_exit(terminal_id).await {
764                Ok(result) => result,
765                Err(error) => serde_json::json!({ "error": error }),
766            }
767        }
768        "terminal/kill" => {
769            let terminal_id = params["terminalId"].as_str().unwrap_or("");
770            match TerminalManager::global().kill(terminal_id).await {
771                Ok(_) => serde_json::json!({}),
772                Err(error) => serde_json::json!({ "error": error }),
773            }
774        }
775        "terminal/release" => {
776            let terminal_id = params["terminalId"].as_str().unwrap_or("");
777            TerminalManager::global().release(terminal_id).await;
778            serde_json::json!({})
779        }
780        _ => {
781            tracing::warn!("[AcpProcess] Unknown agent request: {}", method);
782            serde_json::json!({})
783        }
784    }
785}
786
787/// Try to find and parse embedded JSON objects in a line.
788fn try_parse_embedded_json(line: &str) -> Option<serde_json::Value> {
789    let mut depth = 0i32;
790    let mut start = None;
791
792    for (i, ch) in line.char_indices() {
793        match ch {
794            '{' => {
795                if depth == 0 {
796                    start = Some(i);
797                }
798                depth += 1;
799            }
800            '}' => {
801                depth -= 1;
802                if depth == 0 {
803                    if let Some(s) = start {
804                        if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line[s..=i]) {
805                            return Some(v);
806                        }
807                    }
808                    start = None;
809                }
810            }
811            _ => {}
812        }
813    }
814    None
815}