Skip to main content

routa_core/acp/
process.rs

1//! AcpProcess — manages a single ACP agent child process with JSON-RPC over stdio.
2//!
3//! The lifecycle mirrors the Next.js `AcpProcess` class:
4//!   1. `spawn(command, args)` — start the child, launch a background stdout reader
5//!   2. `initialize()`         — send "initialize" request, wait for response
6//!   3. `new_session(cwd)`     — send "session/new", get back sessionId
7//!   4. `prompt(sid, text)`    — send "session/prompt" (5-min timeout), stream via SSE
8//!   5. `kill()`               — terminate the process
9//!
10//! Agent→client requests (permissions, fs, terminal) are handled in the background reader.
11//! Agent message notifications are traced to JSONL files for attribution tracking.
12
13use std::collections::HashMap;
14use std::io::ErrorKind;
15#[cfg(windows)]
16use std::os::windows::process::CommandExt;
17use std::path::Path;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::Duration;
21
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::process::{Child, ChildStdin};
24use tokio::sync::{broadcast, oneshot, Mutex};
25
26use super::terminal_manager::TerminalManager;
27#[cfg(windows)]
28use super::CREATE_NO_WINDOW;
29use crate::trace::{
30    Contributor, TraceConversation, TraceEventType, TraceRecord, TraceTool, TraceWriter,
31};
32
33/// Callback type for session/update notifications from the agent.
34pub type NotificationSender = broadcast::Sender<serde_json::Value>;
35
36/// Type alias for the pending request map to avoid complex type repetition.
37type PendingMap = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<serde_json::Value, String>>>>>;
38
39/// A managed ACP agent child process.
40pub struct AcpProcess {
41    stdin: Arc<Mutex<ChildStdin>>,
42    child: Arc<Mutex<Option<Child>>>,
43    pending: PendingMap,
44    next_id: Arc<AtomicU64>,
45    alive: Arc<AtomicBool>,
46    notification_tx: NotificationSender,
47    display_name: String,
48    /// The command used to spawn this process (e.g., "npx", "uvx", "opencode")
49    command: String,
50    _reader_handle: tokio::task::JoinHandle<()>,
51}
52
53impl AcpProcess {
54    /// Spawn the agent process and start the background reader.
55    ///
56    /// `our_session_id` is used to rewrite the agent's session ID in notifications
57    /// so the frontend SSE stream matches on the correct session.
58    pub async fn spawn(
59        command: &str,
60        args: &[&str],
61        cwd: &str,
62        notification_tx: NotificationSender,
63        display_name: &str,
64        our_session_id: &str,
65    ) -> Result<Self, String> {
66        tracing::info!(
67            "[AcpProcess:{}] Spawning: {} {} (cwd: {})",
68            display_name,
69            command,
70            args.join(" "),
71            cwd,
72        );
73
74        let cwd_path = Path::new(cwd);
75        if !cwd_path.exists() {
76            return Err(format!(
77                "Invalid session cwd '{cwd}': directory does not exist"
78            ));
79        }
80        if !cwd_path.is_dir() {
81            return Err(format!(
82                "Invalid session cwd '{cwd}': path is not a directory"
83            ));
84        }
85
86        // Resolve the actual binary path using the full shell PATH
87        // (macOS GUI apps have a minimal PATH that won't find user CLI tools)
88        let resolved_command =
89            crate::shell_env::which(command).unwrap_or_else(|| command.to_string());
90
91        let mut command_builder = tokio::process::Command::new(&resolved_command);
92        command_builder
93            .args(args)
94            .current_dir(cwd)
95            .env("PATH", crate::shell_env::full_path())
96            .env("NODE_NO_READLINE", "1")
97            .stdin(std::process::Stdio::piped())
98            .stdout(std::process::Stdio::piped())
99            .stderr(std::process::Stdio::piped());
100
101        #[cfg(windows)]
102        command_builder
103            .as_std_mut()
104            .creation_flags(CREATE_NO_WINDOW);
105
106        // codex-acp often returns only stopReason in session/prompt result.
107        // Enabling lightweight codex logs gives us process_output lines that
108        // include assistant deltas, which the CLI can aggregate as final output.
109        if resolved_command.ends_with("codex-acp") && std::env::var_os("RUST_LOG").is_none() {
110            command_builder.env(
111                "RUST_LOG",
112                "info,codex_acp::thread=info,codex_acp::codex_agent=info",
113            );
114        }
115
116        let mut child = command_builder.spawn().map_err(|e| match e.kind() {
117            ErrorKind::NotFound => {
118                let resolved_exists = Path::new(&resolved_command).exists();
119                if resolved_exists {
120                    format!(
121                        "Failed to execute '{command}' (resolved: '{resolved_command}'): {e}. The binary exists, but a required interpreter or wrapper target may be missing."
122                    )
123                } else {
124                    format!(
125                        "Failed to spawn '{command}' (resolved: '{resolved_command}'): {e}. Is it installed and in PATH?"
126                    )
127                }
128            }
129            _ => format!(
130                "Failed to spawn '{command}' (resolved: '{resolved_command}') from cwd '{cwd}': {e}"
131            ),
132        })?;
133
134        let stdin = child
135            .stdin
136            .take()
137            .ok_or_else(|| "No stdin on child process".to_string())?;
138        let stdout = child
139            .stdout
140            .take()
141            .ok_or_else(|| "No stdout on child process".to_string())?;
142        let stderr = child.stderr.take();
143
144        let alive = Arc::new(AtomicBool::new(true));
145        let pending: PendingMap = Arc::new(Mutex::new(HashMap::new()));
146        let stdin = Arc::new(Mutex::new(stdin));
147
148        let name = display_name.to_string();
149
150        // Log stderr in background and forward to frontend as process_output
151        if let Some(stderr) = stderr {
152            let name_clone = name.clone();
153            let ntx_stderr = notification_tx.clone();
154            let our_sid_stderr = our_session_id.to_string();
155            let resolved_command_stderr = resolved_command.clone();
156            tokio::spawn(async move {
157                let reader = BufReader::new(stderr);
158                let mut lines = reader.lines();
159                while let Ok(Some(line)) = lines.next_line().await {
160                    if !line.trim().is_empty() {
161                        if should_ignore_process_stderr(
162                            &resolved_command_stderr,
163                            &name_clone,
164                            &line,
165                        ) {
166                            continue;
167                        }
168                        tracing::debug!("[AcpProcess:{} stderr] {}", name_clone, line);
169                        // Forward stderr to frontend as process_output notification
170                        let notification = serde_json::json!({
171                            "jsonrpc": "2.0",
172                            "method": "session/update",
173                            "params": {
174                                "sessionId": our_sid_stderr,
175                                "update": {
176                                    "sessionUpdate": "process_output",
177                                    "source": "stderr",
178                                    "data": format!("{}\n", line),
179                                    "displayName": name_clone,
180                                }
181                            }
182                        });
183                        let _ = ntx_stderr.send(notification);
184                    }
185                }
186            });
187        }
188
189        // Background stdout reader — dispatches responses, notifications, agent requests
190        let alive_clone = alive.clone();
191        let pending_clone = pending.clone();
192        let ntx = notification_tx.clone();
193        let stdin_clone = stdin.clone();
194        let name_clone = name.clone();
195        let our_sid = our_session_id.to_string();
196        let cwd_clone = cwd.to_string();
197        let provider_clone = display_name.to_string();
198
199        let reader_handle = tokio::spawn(async move {
200            let reader = BufReader::new(stdout);
201            let mut lines = reader.lines();
202            // Buffer for accumulating agent message chunks
203            let mut agent_msg_buffer = String::new();
204            // Buffer for accumulating agent thought chunks
205            let mut agent_thought_buffer = String::new();
206            // Buffer for pending tool calls awaiting rawInput (OpenCode sends empty rawInput initially)
207            let mut pending_tool_calls: std::collections::HashMap<String, (String, bool)> =
208                std::collections::HashMap::new();
209
210            while let Ok(Some(line)) = lines.next_line().await {
211                let line = line.trim().to_string();
212                if line.is_empty() {
213                    continue;
214                }
215
216                let msg: serde_json::Value = match serde_json::from_str(&line) {
217                    Ok(v) => v,
218                    Err(_) => {
219                        // Try to find embedded JSON objects
220                        if let Some(v) = try_parse_embedded_json(&line) {
221                            v
222                        } else {
223                            tracing::debug!(
224                                "[AcpProcess:{}] Non-JSON stdout: {}",
225                                name_clone,
226                                truncate_content(&line, 200)
227                            );
228                            continue;
229                        }
230                    }
231                };
232
233                let has_id = msg.get("id").is_some() && !msg.get("id").unwrap().is_null();
234                let has_result = msg.get("result").is_some();
235                let has_error = msg.get("error").is_some();
236                let has_method = msg.get("method").and_then(|m| m.as_str()).is_some();
237
238                if has_id && (has_result || has_error) {
239                    // Response to a pending request
240                    let id = msg["id"].as_u64().unwrap_or(0);
241                    let mut map = pending_clone.lock().await;
242                    if let Some(tx) = map.remove(&id) {
243                        if has_error {
244                            let err_msg =
245                                msg["error"]["message"].as_str().unwrap_or("unknown error");
246                            let err_code = msg["error"]["code"].as_i64().unwrap_or(0);
247                            let _ = tx.send(Err(format!("ACP Error [{err_code}]: {err_msg}")));
248                        } else {
249                            let _ = tx.send(Ok(msg["result"].clone()));
250                        }
251                    }
252                } else if has_id && has_method {
253                    // Agent→Client request — handle it
254                    let method = msg["method"].as_str().unwrap_or("");
255                    let id_val = msg["id"].clone();
256                    tracing::info!(
257                        "[AcpProcess:{}] Agent request: {} (id={})",
258                        name_clone,
259                        method,
260                        id_val
261                    );
262                    let response =
263                        handle_agent_request(method, &msg["params"], &our_sid, &ntx).await;
264                    let reply = serde_json::json!({
265                        "jsonrpc": "2.0",
266                        "id": id_val,
267                        "result": response,
268                    });
269                    let data = format!("{}\n", serde_json::to_string(&reply).unwrap());
270                    let mut stdin = stdin_clone.lock().await;
271                    let _ = stdin.write_all(data.as_bytes()).await;
272                    let _ = stdin.flush().await;
273                } else if has_method {
274                    // Notification (no id) — forward to SSE
275                    // Rewrite the agent's sessionId to our session ID so the
276                    // frontend SSE stream can match on the correct session.
277                    let mut rewritten = msg.clone();
278                    if let Some(params) = rewritten.get_mut("params") {
279                        if params.get("sessionId").is_some() {
280                            params["sessionId"] = serde_json::Value::String(our_sid.clone());
281                        }
282                    }
283
284                    // ── Trace: various event types ─────────────────────────────
285                    if let Some(params) = msg.get("params") {
286                        if let Some(update) = params.get("update") {
287                            let session_update = update
288                                .get("sessionUpdate")
289                                .and_then(|v| v.as_str())
290                                .unwrap_or("");
291
292                            match session_update {
293                                "agent_thought_chunk" => {
294                                    // Accumulate thought chunks
295                                    let text = update
296                                        .get("content")
297                                        .and_then(|c| c.get("text"))
298                                        .and_then(|t| t.as_str())
299                                        .unwrap_or("");
300                                    agent_thought_buffer.push_str(text);
301                                    // Trace when buffer reaches 100+ chars
302                                    if agent_thought_buffer.len() >= 100 {
303                                        let record = TraceRecord::new(
304                                            &our_sid,
305                                            TraceEventType::AgentThought,
306                                            Contributor::new(&provider_clone, None),
307                                        )
308                                        .with_conversation(TraceConversation {
309                                            turn: None,
310                                            role: Some("assistant".to_string()),
311                                            content_preview: Some(truncate_content(
312                                                &agent_thought_buffer,
313                                                200,
314                                            )),
315                                            full_content: Some(agent_thought_buffer.clone()),
316                                        });
317                                        let writer = TraceWriter::new(&cwd_clone);
318                                        let _ = writer.append_safe(&record).await;
319                                        agent_thought_buffer.clear();
320                                    }
321                                }
322                                "agent_message_chunk" => {
323                                    // Accumulate message chunks
324                                    let text = update
325                                        .get("content")
326                                        .and_then(|c| c.get("text"))
327                                        .and_then(|t| t.as_str())
328                                        .unwrap_or("");
329                                    agent_msg_buffer.push_str(text);
330                                    // Trace when buffer reaches 100+ chars
331                                    if agent_msg_buffer.len() >= 100 {
332                                        let record = TraceRecord::new(
333                                            &our_sid,
334                                            TraceEventType::AgentMessage,
335                                            Contributor::new(&provider_clone, None),
336                                        )
337                                        .with_conversation(TraceConversation {
338                                            turn: None,
339                                            role: Some("assistant".to_string()),
340                                            content_preview: Some(truncate_content(
341                                                &agent_msg_buffer,
342                                                200,
343                                            )),
344                                            full_content: Some(agent_msg_buffer.clone()),
345                                        });
346                                        let writer = TraceWriter::new(&cwd_clone);
347                                        let _ = writer.append_safe(&record).await;
348                                        agent_msg_buffer.clear();
349                                    }
350                                }
351                                "agent_message" => {
352                                    // Full message - trace immediately
353                                    let text = update
354                                        .get("content")
355                                        .and_then(|c| c.get("text"))
356                                        .and_then(|t| t.as_str())
357                                        .unwrap_or("");
358                                    let record = TraceRecord::new(
359                                        &our_sid,
360                                        TraceEventType::AgentMessage,
361                                        Contributor::new(&provider_clone, None),
362                                    )
363                                    .with_conversation(TraceConversation {
364                                        turn: None,
365                                        role: Some("assistant".to_string()),
366                                        content_preview: Some(truncate_content(text, 200)),
367                                        full_content: Some(text.to_string()),
368                                    });
369                                    let writer = TraceWriter::new(&cwd_clone);
370                                    let _ = writer.append_safe(&record).await;
371                                }
372                                "tool_call" => {
373                                    // Tool call - OpenCode may send rawInput as empty initially
374                                    let tool_call_id =
375                                        update.get("toolCallId").and_then(|v| v.as_str());
376                                    let kind = update
377                                        .get("kind")
378                                        .and_then(|v| v.as_str())
379                                        .or_else(|| update.get("title").and_then(|v| v.as_str()))
380                                        .unwrap_or("unknown");
381                                    let raw_input = update.get("rawInput").cloned();
382
383                                    // Check if rawInput is empty or null
384                                    let has_input = raw_input.as_ref().is_some_and(|v| {
385                                        if let Some(obj) = v.as_object() {
386                                            !obj.is_empty()
387                                        } else {
388                                            !v.is_null()
389                                        }
390                                    });
391
392                                    if has_input {
393                                        // Trace immediately with full input (Claude Code behavior)
394                                        let record = TraceRecord::new(
395                                            &our_sid,
396                                            TraceEventType::ToolCall,
397                                            Contributor::new(&provider_clone, None),
398                                        )
399                                        .with_tool(TraceTool {
400                                            name: kind.to_string(),
401                                            tool_call_id: tool_call_id.map(|s| s.to_string()),
402                                            status: Some("running".to_string()),
403                                            input: raw_input,
404                                            output: None,
405                                        });
406                                        let writer = TraceWriter::new(&cwd_clone);
407                                        let _ = writer.append_safe(&record).await;
408                                    } else if let Some(id) = tool_call_id {
409                                        // OpenCode behavior: rawInput is empty, wait for tool_call_update
410                                        pending_tool_calls
411                                            .insert(id.to_string(), (kind.to_string(), false));
412                                    }
413                                }
414                                "tool_call_update" => {
415                                    // Tool update - may contain rawInput (OpenCode) or just rawOutput (completion)
416                                    let tool_call_id =
417                                        update.get("toolCallId").and_then(|v| v.as_str());
418                                    let kind = update
419                                        .get("kind")
420                                        .and_then(|v| v.as_str())
421                                        .or_else(|| update.get("title").and_then(|v| v.as_str()))
422                                        .unwrap_or("unknown");
423                                    let raw_input = update.get("rawInput").cloned();
424                                    let raw_output = update
425                                        .get("rawOutput")
426                                        .and_then(|v| v.as_str())
427                                        .map(|s| serde_json::Value::String(s.to_string()))
428                                        .or_else(|| update.get("rawOutput").cloned());
429                                    let status = update
430                                        .get("status")
431                                        .and_then(|v| v.as_str())
432                                        .unwrap_or("completed");
433
434                                    // Check if this update has rawInput and the tool_call wasn't traced yet
435                                    let has_input = raw_input.as_ref().is_some_and(|v| {
436                                        if let Some(obj) = v.as_object() {
437                                            !obj.is_empty()
438                                        } else {
439                                            !v.is_null()
440                                        }
441                                    });
442
443                                    if let Some(id) = tool_call_id {
444                                        if let Some((stored_kind, traced)) =
445                                            pending_tool_calls.get_mut(id)
446                                        {
447                                            if has_input && !*traced {
448                                                // Record the tool_call trace now with actual input
449                                                let record = TraceRecord::new(
450                                                    &our_sid,
451                                                    TraceEventType::ToolCall,
452                                                    Contributor::new(&provider_clone, None),
453                                                )
454                                                .with_tool(TraceTool {
455                                                    name: stored_kind.clone(),
456                                                    tool_call_id: Some(id.to_string()),
457                                                    status: Some("running".to_string()),
458                                                    input: raw_input.clone(),
459                                                    output: None,
460                                                });
461                                                let writer = TraceWriter::new(&cwd_clone);
462                                                let _ = writer.append_safe(&record).await;
463                                                *traced = true;
464                                            }
465                                        }
466                                    }
467
468                                    // Record tool_result trace when status indicates completion or we have output
469                                    let is_complete = status == "completed"
470                                        || status == "failed"
471                                        || raw_output.is_some();
472                                    if is_complete {
473                                        let record = TraceRecord::new(
474                                            &our_sid,
475                                            TraceEventType::ToolResult,
476                                            Contributor::new(&provider_clone, None),
477                                        )
478                                        .with_tool(TraceTool {
479                                            name: kind.to_string(),
480                                            tool_call_id: tool_call_id.map(|s| s.to_string()),
481                                            status: Some(status.to_string()),
482                                            input: None,
483                                            output: raw_output,
484                                        });
485                                        let writer = TraceWriter::new(&cwd_clone);
486                                        let _ = writer.append_safe(&record).await;
487
488                                        // Clean up pending entry
489                                        if let Some(id) = tool_call_id {
490                                            pending_tool_calls.remove(id);
491                                        }
492                                    }
493                                }
494                                _ => {}
495                            }
496                        }
497                    }
498
499                    let _ = ntx.send(rewritten);
500                } else {
501                    tracing::debug!(
502                        "[AcpProcess:{}] Unhandled message: {}",
503                        name_clone,
504                        truncate_content(&line, 200)
505                    );
506                }
507            }
508
509            // Flush any remaining buffered agent message content
510            if !agent_msg_buffer.is_empty() {
511                let record = TraceRecord::new(
512                    &our_sid,
513                    TraceEventType::AgentMessage,
514                    Contributor::new(&provider_clone, None),
515                )
516                .with_conversation(TraceConversation {
517                    turn: None,
518                    role: Some("assistant".to_string()),
519                    content_preview: Some(truncate_content(&agent_msg_buffer, 200)),
520                    full_content: Some(agent_msg_buffer.clone()),
521                });
522                let writer = TraceWriter::new(&cwd_clone);
523                let _ = writer.append_safe(&record).await;
524            }
525
526            // Flush any remaining buffered agent thought content
527            if !agent_thought_buffer.is_empty() {
528                let record = TraceRecord::new(
529                    &our_sid,
530                    TraceEventType::AgentThought,
531                    Contributor::new(&provider_clone, None),
532                )
533                .with_conversation(TraceConversation {
534                    turn: None,
535                    role: Some("assistant".to_string()),
536                    content_preview: Some(truncate_content(&agent_thought_buffer, 200)),
537                    full_content: Some(agent_thought_buffer.clone()),
538                });
539                let writer = TraceWriter::new(&cwd_clone);
540                let _ = writer.append_safe(&record).await;
541            }
542
543            alive_clone.store(false, Ordering::SeqCst);
544            tracing::info!("[AcpProcess:{}] stdout reader finished", name_clone);
545        });
546
547        // Wait briefly for process to stabilize
548        tokio::time::sleep(Duration::from_millis(300)).await;
549
550        if !alive.load(Ordering::SeqCst) {
551            return Err(format!("{display_name} process died during startup"));
552        }
553
554        tracing::info!("[AcpProcess:{}] Process started", display_name);
555
556        Ok(Self {
557            stdin,
558            child: Arc::new(Mutex::new(Some(child))),
559            pending,
560            next_id: Arc::new(AtomicU64::new(1)),
561            alive,
562            notification_tx,
563            display_name: display_name.to_string(),
564            command: command.to_string(),
565            _reader_handle: reader_handle,
566        })
567    }
568
569    /// Whether the process is still alive.
570    pub fn is_alive(&self) -> bool {
571        self.alive.load(Ordering::SeqCst)
572    }
573
574    /// Send a JSON-RPC request and wait for the response.
575    pub async fn send_request(
576        &self,
577        method: &str,
578        params: serde_json::Value,
579        timeout_ms: Option<u64>,
580    ) -> Result<serde_json::Value, String> {
581        if !self.is_alive() {
582            return Err(format!("{} process is not alive", self.display_name));
583        }
584
585        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
586        let (tx, rx) = oneshot::channel();
587
588        self.pending.lock().await.insert(id, tx);
589
590        let msg = serde_json::json!({
591            "jsonrpc": "2.0",
592            "id": id,
593            "method": method,
594            "params": params,
595        });
596        let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
597
598        {
599            let mut stdin = self.stdin.lock().await;
600            stdin
601                .write_all(data.as_bytes())
602                .await
603                .map_err(|e| format!("Write {method}: {e}"))?;
604            stdin
605                .flush()
606                .await
607                .map_err(|e| format!("Flush {method}: {e}"))?;
608        }
609
610        // Determine timeout based on method and command type
611        // npx/uvx agents may need longer timeout for first-time package download
612        let is_npx_or_uvx = self.command == "npx" || self.command == "uvx";
613        let default_timeout = match method {
614            "initialize" | "session/new" | "session/load" => {
615                if is_npx_or_uvx {
616                    120_000 // 2 min for npx/uvx (may need to download packages)
617                } else {
618                    15_000 // 15s for others
619                }
620            }
621            "session/prompt" => 300_000, // 5 min
622            _ => 30_000,
623        };
624        let timeout_dur = Duration::from_millis(timeout_ms.unwrap_or(default_timeout));
625
626        match tokio::time::timeout(timeout_dur, rx).await {
627            Ok(Ok(result)) => result,
628            Ok(Err(_)) => Err(format!("Channel closed for {method} (id={id})")),
629            Err(_) => {
630                self.pending.lock().await.remove(&id);
631                Err(format!(
632                    "Timeout waiting for {} (id={}, {}ms)",
633                    method,
634                    id,
635                    timeout_dur.as_millis()
636                ))
637            }
638        }
639    }
640
641    /// Initialize the ACP protocol.
642    pub async fn initialize(&self) -> Result<serde_json::Value, String> {
643        self.initialize_with_timeout(None).await
644    }
645
646    /// Initialize the ACP protocol with an optional timeout override.
647    pub async fn initialize_with_timeout(
648        &self,
649        timeout_ms: Option<u64>,
650    ) -> Result<serde_json::Value, String> {
651        let result = self
652            .send_request(
653                "initialize",
654                serde_json::json!({
655                    "protocolVersion": 1,
656                    "clientInfo": {
657                        "name": "routa-desktop",
658                        "version": "0.1.0"
659                    }
660                }),
661                timeout_ms,
662            )
663            .await?;
664        tracing::info!(
665            "[AcpProcess:{}] Initialized: {}",
666            self.display_name,
667            serde_json::to_string(&result).unwrap_or_default()
668        );
669        Ok(result)
670    }
671
672    /// Create a new ACP session. Returns the agent's session ID.
673    pub async fn new_session(
674        &self,
675        cwd: &str,
676        mcp_servers: &[serde_json::Value],
677    ) -> Result<String, String> {
678        let result = self
679            .send_request(
680                "session/new",
681                serde_json::json!({
682                    "cwd": cwd,
683                    "mcpServers": mcp_servers
684                }),
685                None,
686            )
687            .await?;
688
689        let session_id = result["sessionId"]
690            .as_str()
691            .ok_or_else(|| "No sessionId in session/new response".to_string())?
692            .to_string();
693
694        tracing::info!(
695            "[AcpProcess:{}] Session created: {}",
696            self.display_name,
697            session_id
698        );
699        Ok(session_id)
700    }
701
702    /// Load a persisted ACP session. Returns the agent's resumed session ID.
703    pub async fn load_session(
704        &self,
705        session_id: &str,
706        cwd: &str,
707        mcp_servers: &[serde_json::Value],
708    ) -> Result<String, String> {
709        let result = self
710            .send_request(
711                "session/load",
712                serde_json::json!({
713                    "sessionId": session_id,
714                    "cwd": cwd,
715                    "mcpServers": mcp_servers,
716                }),
717                None,
718            )
719            .await?;
720
721        let resumed_session_id = result["sessionId"]
722            .as_str()
723            .unwrap_or(session_id)
724            .to_string();
725
726        tracing::info!(
727            "[AcpProcess:{}] Session loaded: {}",
728            self.display_name,
729            resumed_session_id
730        );
731        Ok(resumed_session_id)
732    }
733
734    /// Send a prompt to an existing session. 5-minute timeout.
735    pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
736        self.send_request(
737            "session/prompt",
738            serde_json::json!({
739                "sessionId": session_id,
740                "prompt": [{ "type": "text", "text": text }]
741            }),
742            Some(300_000),
743        )
744        .await
745    }
746
747    /// Send session/cancel notification (no response expected).
748    pub async fn cancel(&self, session_id: &str) {
749        let msg = serde_json::json!({
750            "jsonrpc": "2.0",
751            "method": "session/cancel",
752            "params": { "sessionId": session_id }
753        });
754        let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
755        let mut stdin = self.stdin.lock().await;
756        let _ = stdin.write_all(data.as_bytes()).await;
757        let _ = stdin.flush().await;
758    }
759
760    /// Get the notification broadcast sender (for subscribing to SSE).
761    pub fn notification_sender(&self) -> &NotificationSender {
762        &self.notification_tx
763    }
764
765    /// Kill the agent process.
766    pub async fn kill(&self) {
767        self.alive.store(false, Ordering::SeqCst);
768        if let Some(mut child) = self.child.lock().await.take() {
769            tracing::info!("[AcpProcess:{}] Killing process", self.display_name);
770            let _ = child.kill().await;
771        }
772        // Reject all pending requests
773        let mut map = self.pending.lock().await;
774        for (_, tx) in map.drain() {
775            let _ = tx.send(Err("Process killed".to_string()));
776        }
777    }
778}
779
780/// Handle agent→client requests. Auto-approves permissions, handles fs ops.
781async fn handle_agent_request(
782    method: &str,
783    params: &serde_json::Value,
784    session_id: &str,
785    notification_tx: &NotificationSender,
786) -> serde_json::Value {
787    match method {
788        "session/request_permission" => {
789            tracing::info!("[AcpProcess] session/request_permission params={}", params);
790            build_permission_approval_result(params)
791        }
792        "fs/read_text_file" => {
793            let path = params["path"].as_str().unwrap_or("");
794            match tokio::fs::read_to_string(path).await {
795                Ok(content) => serde_json::json!({ "content": content }),
796                Err(e) => serde_json::json!({
797                    "error": format!("Failed to read file: {}", e)
798                }),
799            }
800        }
801        "fs/write_text_file" => {
802            let path = params["path"].as_str().unwrap_or("");
803            let content = params["content"].as_str().unwrap_or("");
804            if let Some(parent) = std::path::Path::new(path).parent() {
805                let _ = tokio::fs::create_dir_all(parent).await;
806            }
807            match tokio::fs::write(path, content).await {
808                Ok(_) => serde_json::json!({}),
809                Err(e) => serde_json::json!({
810                    "error": format!("Failed to write file: {}", e)
811                }),
812            }
813        }
814        "terminal/create" => {
815            match TerminalManager::global()
816                .create(params, session_id, notification_tx)
817                .await
818            {
819                Ok(result) => result,
820                Err(error) => serde_json::json!({ "error": error }),
821            }
822        }
823        "terminal/output" => {
824            let terminal_id = params["terminalId"].as_str().unwrap_or("");
825            match TerminalManager::global().get_output(terminal_id).await {
826                Ok(result) => result,
827                Err(error) => serde_json::json!({ "error": error }),
828            }
829        }
830        "terminal/wait_for_exit" => {
831            let terminal_id = params["terminalId"].as_str().unwrap_or("");
832            match TerminalManager::global().wait_for_exit(terminal_id).await {
833                Ok(result) => result,
834                Err(error) => serde_json::json!({ "error": error }),
835            }
836        }
837        "terminal/kill" => {
838            let terminal_id = params["terminalId"].as_str().unwrap_or("");
839            match TerminalManager::global().kill(terminal_id).await {
840                Ok(_) => serde_json::json!({}),
841                Err(error) => serde_json::json!({ "error": error }),
842            }
843        }
844        "terminal/release" => {
845            let terminal_id = params["terminalId"].as_str().unwrap_or("");
846            TerminalManager::global().release(terminal_id).await;
847            serde_json::json!({})
848        }
849        _ => {
850            tracing::warn!("[AcpProcess] Unknown agent request: {}", method);
851            serde_json::json!({})
852        }
853    }
854}
855
856/// Try to find and parse embedded JSON objects in a line.
857fn try_parse_embedded_json(line: &str) -> Option<serde_json::Value> {
858    let mut depth = 0i32;
859    let mut start = None;
860
861    for (i, ch) in line.char_indices() {
862        match ch {
863            '{' => {
864                if depth == 0 {
865                    start = Some(i);
866                }
867                depth += 1;
868            }
869            '}' => {
870                depth -= 1;
871                if depth == 0 {
872                    if let Some(s) = start {
873                        if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line[s..=i]) {
874                            return Some(v);
875                        }
876                    }
877                    start = None;
878                }
879            }
880            _ => {}
881        }
882    }
883    None
884}
885
886/// Build permission approval response following ACP spec and codex-acp expectations.
887/// This mirrors the Next.js implementation in acp-process.ts.
888fn build_permission_approval_result(params: &serde_json::Value) -> serde_json::Value {
889    // Default scope is "turn"
890    let scope = "turn";
891
892    // Try to resolve optionId from params.options array
893    if let Some(option_id) = resolve_permission_option_id(params, scope) {
894        return serde_json::json!({
895            "outcome": {
896                "outcome": "selected",
897                "optionId": option_id
898            }
899        });
900    }
901
902    // Fallback: return cancelled outcome
903    serde_json::json!({
904        "outcome": {
905            "outcome": "cancelled"
906        }
907    })
908}
909
910/// Resolve the appropriate permission optionId from the options array.
911/// Prefers "approved" or "approved-for-session" for turn/session scope.
912fn resolve_permission_option_id(params: &serde_json::Value, scope: &str) -> Option<String> {
913    let options = params.get("options")?.as_array()?;
914
915    // Define preferred option IDs based on scope
916    let preferred_ids = if scope == "session" {
917        vec![
918            "approved-for-session",
919            "approved-always",
920            "approved-execpolicy-amendment",
921            "approved",
922        ]
923    } else {
924        vec![
925            "approved",
926            "approved-once",
927            "approved-for-session",
928            "approved-always",
929            "approved-execpolicy-amendment",
930        ]
931    };
932
933    // Define preferred option kinds
934    let preferred_kinds = if scope == "session" {
935        vec!["allow_always", "allow_once"]
936    } else {
937        vec!["allow_once", "allow_always"]
938    };
939
940    // First, try to find by preferred optionId
941    for pref_id in &preferred_ids {
942        for option in options {
943            if let Some(option_id) = option.get("optionId").and_then(|v| v.as_str()) {
944                if option_id == *pref_id {
945                    return Some(option_id.to_string());
946                }
947            }
948        }
949    }
950
951    // Second, try to find by preferred kind
952    for pref_kind in &preferred_kinds {
953        for option in options {
954            if let Some(kind) = option.get("kind").and_then(|v| v.as_str()) {
955                if kind == *pref_kind {
956                    if let Some(option_id) = option.get("optionId").and_then(|v| v.as_str()) {
957                        return Some(option_id.to_string());
958                    }
959                }
960            }
961        }
962    }
963
964    // Fallback: use first available option or default
965    if let Some(first_option) = options.first() {
966        if let Some(option_id) = first_option.get("optionId").and_then(|v| v.as_str()) {
967            return Some(option_id.to_string());
968        }
969    }
970
971    // Last resort: return default based on scope
972    Some(if scope == "session" {
973        "approved-for-session".to_string()
974    } else {
975        "approved".to_string()
976    })
977}
978
979/// Safely truncate a string at a UTF-8 character boundary.
980/// Returns a substring of at most `max_bytes` bytes, but ensures it doesn't
981/// cut in the middle of a multi-byte UTF-8 character.
982fn truncate_content(s: &str, max_bytes: usize) -> String {
983    if s.len() <= max_bytes {
984        return s.to_string();
985    }
986
987    // Find the last valid UTF-8 character boundary before max_bytes
988    let mut end = max_bytes;
989    while end > 0 && !s.is_char_boundary(end) {
990        end -= 1;
991    }
992
993    s[..end].to_string()
994}
995
996fn should_ignore_process_stderr(command: &str, display_name: &str, line: &str) -> bool {
997    is_codex_process(command, display_name) && is_codex_otel_stderr(line)
998}
999
1000fn is_codex_process(command: &str, display_name: &str) -> bool {
1001    let trimmed_command = command.trim();
1002    display_name.trim().eq_ignore_ascii_case("codex")
1003        || trimmed_command.eq_ignore_ascii_case("codex-acp")
1004        || trimmed_command.ends_with("/codex-acp")
1005}
1006
1007fn is_codex_otel_stderr(line: &str) -> bool {
1008    let trimmed = line.trim();
1009    trimmed.contains("codex_otel.log_only:") || trimmed.contains("codex_otel.trace_safe:")
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014    use super::{is_codex_otel_stderr, resolve_permission_option_id, should_ignore_process_stderr};
1015    use serde_json::json;
1016
1017    #[test]
1018    fn ignores_codex_otel_stderr_noise() {
1019        let line = "INFO ... codex_otel.log_only: event.kind=response.output_text.delta";
1020        assert!(should_ignore_process_stderr(
1021            "/opt/homebrew/bin/codex-acp",
1022            "Codex",
1023            line
1024        ));
1025        assert!(is_codex_otel_stderr(line));
1026    }
1027
1028    #[test]
1029    fn preserves_non_otel_codex_stderr() {
1030        let line = "ERROR codex_api::endpoint::responses_websocket: failed to connect";
1031        assert!(!should_ignore_process_stderr(
1032            "/opt/homebrew/bin/codex-acp",
1033            "Codex",
1034            line
1035        ));
1036    }
1037
1038    #[test]
1039    fn preserves_other_provider_stderr() {
1040        let line = "INFO ... codex_otel.log_only: event.kind=response.output_text.delta";
1041        assert!(!should_ignore_process_stderr(
1042            "/usr/bin/opencode",
1043            "OpenCode",
1044            line
1045        ));
1046    }
1047
1048    #[test]
1049    fn resolve_permission_option_id_prefers_turn_approval() {
1050        let params = json!({
1051            "options": [
1052                { "optionId": "denied", "kind": "deny_once" },
1053                { "optionId": "approved", "kind": "allow_once" }
1054            ]
1055        });
1056
1057        assert_eq!(
1058            resolve_permission_option_id(&params, "turn").as_deref(),
1059            Some("approved")
1060        );
1061    }
1062}