Skip to main content

routa_core/acp/
process.rs

1//! AcpProcess — manages a single ACP agent child process with JSON-RPC over stdio.
2//!
3//! The lifecycle mirrors the Next.js `AcpProcess` class:
4//!   1. `spawn(command, args)` — start the child, launch a background stdout reader
5//!   2. `initialize()`         — send "initialize" request, wait for response
6//!   3. `new_session(cwd)`     — send "session/new", get back sessionId
7//!   4. `prompt(sid, text)`    — send "session/prompt" (5-min timeout), stream via SSE
8//!   5. `kill()`               — terminate the process
9//!
10//! Agent→client requests (permissions, fs, terminal) are handled in the background reader.
11//! Agent message notifications are traced to JSONL files for attribution tracking.
12
13use std::collections::HashMap;
14use std::io::ErrorKind;
15#[cfg(windows)]
16use std::os::windows::process::CommandExt;
17use std::path::Path;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::Duration;
21
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::process::{Child, ChildStdin};
24use tokio::sync::{broadcast, oneshot, Mutex};
25
26use super::terminal_manager::TerminalManager;
27#[cfg(windows)]
28use super::CREATE_NO_WINDOW;
29use crate::trace::{
30    Contributor, TraceConversation, TraceEventType, TraceRecord, TraceTool, TraceWriter,
31};
32
33/// Callback type for session/update notifications from the agent.
34pub type NotificationSender = broadcast::Sender<serde_json::Value>;
35
36/// Type alias for the pending request map to avoid complex type repetition.
37type PendingMap = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<serde_json::Value, String>>>>>;
38
39/// A managed ACP agent child process.
40pub struct AcpProcess {
41    stdin: Arc<Mutex<ChildStdin>>,
42    child: Arc<Mutex<Option<Child>>>,
43    pending: PendingMap,
44    next_id: Arc<AtomicU64>,
45    alive: Arc<AtomicBool>,
46    notification_tx: NotificationSender,
47    display_name: String,
48    /// The command used to spawn this process (e.g., "npx", "uvx", "opencode")
49    command: String,
50    _reader_handle: tokio::task::JoinHandle<()>,
51}
52
53impl AcpProcess {
54    /// Spawn the agent process and start the background reader.
55    ///
56    /// `our_session_id` is used to rewrite the agent's session ID in notifications
57    /// so the frontend SSE stream matches on the correct session.
58    pub async fn spawn(
59        command: &str,
60        args: &[&str],
61        cwd: &str,
62        notification_tx: NotificationSender,
63        display_name: &str,
64        our_session_id: &str,
65    ) -> Result<Self, String> {
66        tracing::info!(
67            "[AcpProcess:{}] Spawning: {} {} (cwd: {})",
68            display_name,
69            command,
70            args.join(" "),
71            cwd,
72        );
73
74        let cwd_path = Path::new(cwd);
75        if !cwd_path.exists() {
76            return Err(format!(
77                "Invalid session cwd '{}': directory does not exist",
78                cwd
79            ));
80        }
81        if !cwd_path.is_dir() {
82            return Err(format!(
83                "Invalid session cwd '{}': path is not a directory",
84                cwd
85            ));
86        }
87
88        // Resolve the actual binary path using the full shell PATH
89        // (macOS GUI apps have a minimal PATH that won't find user CLI tools)
90        let resolved_command =
91            crate::shell_env::which(command).unwrap_or_else(|| command.to_string());
92
93        let mut command_builder = tokio::process::Command::new(&resolved_command);
94        command_builder
95            .args(args)
96            .current_dir(cwd)
97            .env("PATH", crate::shell_env::full_path())
98            .env("NODE_NO_READLINE", "1")
99            .stdin(std::process::Stdio::piped())
100            .stdout(std::process::Stdio::piped())
101            .stderr(std::process::Stdio::piped());
102
103        #[cfg(windows)]
104        command_builder
105            .as_std_mut()
106            .creation_flags(CREATE_NO_WINDOW);
107
108        // codex-acp often returns only stopReason in session/prompt result.
109        // Enabling lightweight codex logs gives us process_output lines that
110        // include assistant deltas, which the CLI can aggregate as final output.
111        if resolved_command.ends_with("codex-acp") && std::env::var_os("RUST_LOG").is_none() {
112            command_builder.env(
113                "RUST_LOG",
114                "info,codex_acp::thread=info,codex_acp::codex_agent=info",
115            );
116        }
117
118        let mut child = command_builder.spawn().map_err(|e| match e.kind() {
119            ErrorKind::NotFound => {
120                let resolved_exists = Path::new(&resolved_command).exists();
121                if resolved_exists {
122                    format!(
123                        "Failed to execute '{}' (resolved: '{}'): {}. The binary exists, but a required interpreter or wrapper target may be missing.",
124                        command, resolved_command, e
125                    )
126                } else {
127                    format!(
128                        "Failed to spawn '{}' (resolved: '{}'): {}. Is it installed and in PATH?",
129                        command, resolved_command, e
130                    )
131                }
132            }
133            _ => format!(
134                "Failed to spawn '{}' (resolved: '{}') from cwd '{}': {}",
135                command, resolved_command, cwd, e
136            ),
137        })?;
138
139        let stdin = child
140            .stdin
141            .take()
142            .ok_or_else(|| "No stdin on child process".to_string())?;
143        let stdout = child
144            .stdout
145            .take()
146            .ok_or_else(|| "No stdout on child process".to_string())?;
147        let stderr = child.stderr.take();
148
149        let alive = Arc::new(AtomicBool::new(true));
150        let pending: PendingMap = Arc::new(Mutex::new(HashMap::new()));
151        let stdin = Arc::new(Mutex::new(stdin));
152
153        let name = display_name.to_string();
154
155        // Log stderr in background and forward to frontend as process_output
156        if let Some(stderr) = stderr {
157            let name_clone = name.clone();
158            let ntx_stderr = notification_tx.clone();
159            let our_sid_stderr = our_session_id.to_string();
160            let resolved_command_stderr = resolved_command.clone();
161            tokio::spawn(async move {
162                let reader = BufReader::new(stderr);
163                let mut lines = reader.lines();
164                while let Ok(Some(line)) = lines.next_line().await {
165                    if !line.trim().is_empty() {
166                        if should_ignore_process_stderr(
167                            &resolved_command_stderr,
168                            &name_clone,
169                            &line,
170                        ) {
171                            continue;
172                        }
173                        tracing::debug!("[AcpProcess:{} stderr] {}", name_clone, line);
174                        // Forward stderr to frontend as process_output notification
175                        let notification = serde_json::json!({
176                            "jsonrpc": "2.0",
177                            "method": "session/update",
178                            "params": {
179                                "sessionId": our_sid_stderr,
180                                "update": {
181                                    "sessionUpdate": "process_output",
182                                    "source": "stderr",
183                                    "data": format!("{}\n", line),
184                                    "displayName": name_clone,
185                                }
186                            }
187                        });
188                        let _ = ntx_stderr.send(notification);
189                    }
190                }
191            });
192        }
193
194        // Background stdout reader — dispatches responses, notifications, agent requests
195        let alive_clone = alive.clone();
196        let pending_clone = pending.clone();
197        let ntx = notification_tx.clone();
198        let stdin_clone = stdin.clone();
199        let name_clone = name.clone();
200        let our_sid = our_session_id.to_string();
201        let cwd_clone = cwd.to_string();
202        let provider_clone = display_name.to_string();
203
204        let reader_handle = tokio::spawn(async move {
205            let reader = BufReader::new(stdout);
206            let mut lines = reader.lines();
207            // Buffer for accumulating agent message chunks
208            let mut agent_msg_buffer = String::new();
209            // Buffer for accumulating agent thought chunks
210            let mut agent_thought_buffer = String::new();
211            // Buffer for pending tool calls awaiting rawInput (OpenCode sends empty rawInput initially)
212            let mut pending_tool_calls: std::collections::HashMap<String, (String, bool)> =
213                std::collections::HashMap::new();
214
215            while let Ok(Some(line)) = lines.next_line().await {
216                let line = line.trim().to_string();
217                if line.is_empty() {
218                    continue;
219                }
220
221                let msg: serde_json::Value = match serde_json::from_str(&line) {
222                    Ok(v) => v,
223                    Err(_) => {
224                        // Try to find embedded JSON objects
225                        if let Some(v) = try_parse_embedded_json(&line) {
226                            v
227                        } else {
228                            tracing::debug!(
229                                "[AcpProcess:{}] Non-JSON stdout: {}",
230                                name_clone,
231                                truncate_content(&line, 200)
232                            );
233                            continue;
234                        }
235                    }
236                };
237
238                let has_id = msg.get("id").is_some() && !msg.get("id").unwrap().is_null();
239                let has_result = msg.get("result").is_some();
240                let has_error = msg.get("error").is_some();
241                let has_method = msg.get("method").and_then(|m| m.as_str()).is_some();
242
243                if has_id && (has_result || has_error) {
244                    // Response to a pending request
245                    let id = msg["id"].as_u64().unwrap_or(0);
246                    let mut map = pending_clone.lock().await;
247                    if let Some(tx) = map.remove(&id) {
248                        if has_error {
249                            let err_msg =
250                                msg["error"]["message"].as_str().unwrap_or("unknown error");
251                            let err_code = msg["error"]["code"].as_i64().unwrap_or(0);
252                            let _ = tx.send(Err(format!("ACP Error [{}]: {}", err_code, err_msg)));
253                        } else {
254                            let _ = tx.send(Ok(msg["result"].clone()));
255                        }
256                    }
257                } else if has_id && has_method {
258                    // Agent→Client request — handle it
259                    let method = msg["method"].as_str().unwrap_or("");
260                    let id_val = msg["id"].clone();
261                    tracing::info!(
262                        "[AcpProcess:{}] Agent request: {} (id={})",
263                        name_clone,
264                        method,
265                        id_val
266                    );
267                    let response =
268                        handle_agent_request(method, &msg["params"], &our_sid, &ntx).await;
269                    let reply = serde_json::json!({
270                        "jsonrpc": "2.0",
271                        "id": id_val,
272                        "result": response,
273                    });
274                    let data = format!("{}\n", serde_json::to_string(&reply).unwrap());
275                    let mut stdin = stdin_clone.lock().await;
276                    let _ = stdin.write_all(data.as_bytes()).await;
277                    let _ = stdin.flush().await;
278                } else if has_method {
279                    // Notification (no id) — forward to SSE
280                    // Rewrite the agent's sessionId to our session ID so the
281                    // frontend SSE stream can match on the correct session.
282                    let mut rewritten = msg.clone();
283                    if let Some(params) = rewritten.get_mut("params") {
284                        if params.get("sessionId").is_some() {
285                            params["sessionId"] = serde_json::Value::String(our_sid.clone());
286                        }
287                    }
288
289                    // ── Trace: various event types ─────────────────────────────
290                    if let Some(params) = msg.get("params") {
291                        if let Some(update) = params.get("update") {
292                            let session_update = update
293                                .get("sessionUpdate")
294                                .and_then(|v| v.as_str())
295                                .unwrap_or("");
296
297                            match session_update {
298                                "agent_thought_chunk" => {
299                                    // Accumulate thought chunks
300                                    let text = update
301                                        .get("content")
302                                        .and_then(|c| c.get("text"))
303                                        .and_then(|t| t.as_str())
304                                        .unwrap_or("");
305                                    agent_thought_buffer.push_str(text);
306                                    // Trace when buffer reaches 100+ chars
307                                    if agent_thought_buffer.len() >= 100 {
308                                        let record = TraceRecord::new(
309                                            &our_sid,
310                                            TraceEventType::AgentThought,
311                                            Contributor::new(&provider_clone, None),
312                                        )
313                                        .with_conversation(TraceConversation {
314                                            turn: None,
315                                            role: Some("assistant".to_string()),
316                                            content_preview: Some(truncate_content(
317                                                &agent_thought_buffer,
318                                                200,
319                                            )),
320                                            full_content: Some(agent_thought_buffer.clone()),
321                                        });
322                                        let writer = TraceWriter::new(&cwd_clone);
323                                        let _ = writer.append_safe(&record).await;
324                                        agent_thought_buffer.clear();
325                                    }
326                                }
327                                "agent_message_chunk" => {
328                                    // Accumulate message chunks
329                                    let text = update
330                                        .get("content")
331                                        .and_then(|c| c.get("text"))
332                                        .and_then(|t| t.as_str())
333                                        .unwrap_or("");
334                                    agent_msg_buffer.push_str(text);
335                                    // Trace when buffer reaches 100+ chars
336                                    if agent_msg_buffer.len() >= 100 {
337                                        let record = TraceRecord::new(
338                                            &our_sid,
339                                            TraceEventType::AgentMessage,
340                                            Contributor::new(&provider_clone, None),
341                                        )
342                                        .with_conversation(TraceConversation {
343                                            turn: None,
344                                            role: Some("assistant".to_string()),
345                                            content_preview: Some(truncate_content(
346                                                &agent_msg_buffer,
347                                                200,
348                                            )),
349                                            full_content: Some(agent_msg_buffer.clone()),
350                                        });
351                                        let writer = TraceWriter::new(&cwd_clone);
352                                        let _ = writer.append_safe(&record).await;
353                                        agent_msg_buffer.clear();
354                                    }
355                                }
356                                "agent_message" => {
357                                    // Full message - trace immediately
358                                    let text = update
359                                        .get("content")
360                                        .and_then(|c| c.get("text"))
361                                        .and_then(|t| t.as_str())
362                                        .unwrap_or("");
363                                    let record = TraceRecord::new(
364                                        &our_sid,
365                                        TraceEventType::AgentMessage,
366                                        Contributor::new(&provider_clone, None),
367                                    )
368                                    .with_conversation(TraceConversation {
369                                        turn: None,
370                                        role: Some("assistant".to_string()),
371                                        content_preview: Some(truncate_content(text, 200)),
372                                        full_content: Some(text.to_string()),
373                                    });
374                                    let writer = TraceWriter::new(&cwd_clone);
375                                    let _ = writer.append_safe(&record).await;
376                                }
377                                "tool_call" => {
378                                    // Tool call - OpenCode may send rawInput as empty initially
379                                    let tool_call_id =
380                                        update.get("toolCallId").and_then(|v| v.as_str());
381                                    let kind = update
382                                        .get("kind")
383                                        .and_then(|v| v.as_str())
384                                        .or_else(|| update.get("title").and_then(|v| v.as_str()))
385                                        .unwrap_or("unknown");
386                                    let raw_input = update.get("rawInput").cloned();
387
388                                    // Check if rawInput is empty or null
389                                    let has_input = raw_input.as_ref().is_some_and(|v| {
390                                        if let Some(obj) = v.as_object() {
391                                            !obj.is_empty()
392                                        } else {
393                                            !v.is_null()
394                                        }
395                                    });
396
397                                    if has_input {
398                                        // Trace immediately with full input (Claude Code behavior)
399                                        let record = TraceRecord::new(
400                                            &our_sid,
401                                            TraceEventType::ToolCall,
402                                            Contributor::new(&provider_clone, None),
403                                        )
404                                        .with_tool(TraceTool {
405                                            name: kind.to_string(),
406                                            tool_call_id: tool_call_id.map(|s| s.to_string()),
407                                            status: Some("running".to_string()),
408                                            input: raw_input,
409                                            output: None,
410                                        });
411                                        let writer = TraceWriter::new(&cwd_clone);
412                                        let _ = writer.append_safe(&record).await;
413                                    } else if let Some(id) = tool_call_id {
414                                        // OpenCode behavior: rawInput is empty, wait for tool_call_update
415                                        pending_tool_calls
416                                            .insert(id.to_string(), (kind.to_string(), false));
417                                    }
418                                }
419                                "tool_call_update" => {
420                                    // Tool update - may contain rawInput (OpenCode) or just rawOutput (completion)
421                                    let tool_call_id =
422                                        update.get("toolCallId").and_then(|v| v.as_str());
423                                    let kind = update
424                                        .get("kind")
425                                        .and_then(|v| v.as_str())
426                                        .or_else(|| update.get("title").and_then(|v| v.as_str()))
427                                        .unwrap_or("unknown");
428                                    let raw_input = update.get("rawInput").cloned();
429                                    let raw_output = update
430                                        .get("rawOutput")
431                                        .and_then(|v| v.as_str())
432                                        .map(|s| serde_json::Value::String(s.to_string()))
433                                        .or_else(|| update.get("rawOutput").cloned());
434                                    let status = update
435                                        .get("status")
436                                        .and_then(|v| v.as_str())
437                                        .unwrap_or("completed");
438
439                                    // Check if this update has rawInput and the tool_call wasn't traced yet
440                                    let has_input = raw_input.as_ref().is_some_and(|v| {
441                                        if let Some(obj) = v.as_object() {
442                                            !obj.is_empty()
443                                        } else {
444                                            !v.is_null()
445                                        }
446                                    });
447
448                                    if let Some(id) = tool_call_id {
449                                        if let Some((stored_kind, traced)) =
450                                            pending_tool_calls.get_mut(id)
451                                        {
452                                            if has_input && !*traced {
453                                                // Record the tool_call trace now with actual input
454                                                let record = TraceRecord::new(
455                                                    &our_sid,
456                                                    TraceEventType::ToolCall,
457                                                    Contributor::new(&provider_clone, None),
458                                                )
459                                                .with_tool(TraceTool {
460                                                    name: stored_kind.clone(),
461                                                    tool_call_id: Some(id.to_string()),
462                                                    status: Some("running".to_string()),
463                                                    input: raw_input.clone(),
464                                                    output: None,
465                                                });
466                                                let writer = TraceWriter::new(&cwd_clone);
467                                                let _ = writer.append_safe(&record).await;
468                                                *traced = true;
469                                            }
470                                        }
471                                    }
472
473                                    // Record tool_result trace when status indicates completion or we have output
474                                    let is_complete = status == "completed"
475                                        || status == "failed"
476                                        || raw_output.is_some();
477                                    if is_complete {
478                                        let record = TraceRecord::new(
479                                            &our_sid,
480                                            TraceEventType::ToolResult,
481                                            Contributor::new(&provider_clone, None),
482                                        )
483                                        .with_tool(TraceTool {
484                                            name: kind.to_string(),
485                                            tool_call_id: tool_call_id.map(|s| s.to_string()),
486                                            status: Some(status.to_string()),
487                                            input: None,
488                                            output: raw_output,
489                                        });
490                                        let writer = TraceWriter::new(&cwd_clone);
491                                        let _ = writer.append_safe(&record).await;
492
493                                        // Clean up pending entry
494                                        if let Some(id) = tool_call_id {
495                                            pending_tool_calls.remove(id);
496                                        }
497                                    }
498                                }
499                                _ => {}
500                            }
501                        }
502                    }
503
504                    let _ = ntx.send(rewritten);
505                } else {
506                    tracing::debug!(
507                        "[AcpProcess:{}] Unhandled message: {}",
508                        name_clone,
509                        truncate_content(&line, 200)
510                    );
511                }
512            }
513
514            // Flush any remaining buffered agent message content
515            if !agent_msg_buffer.is_empty() {
516                let record = TraceRecord::new(
517                    &our_sid,
518                    TraceEventType::AgentMessage,
519                    Contributor::new(&provider_clone, None),
520                )
521                .with_conversation(TraceConversation {
522                    turn: None,
523                    role: Some("assistant".to_string()),
524                    content_preview: Some(truncate_content(&agent_msg_buffer, 200)),
525                    full_content: Some(agent_msg_buffer.clone()),
526                });
527                let writer = TraceWriter::new(&cwd_clone);
528                let _ = writer.append_safe(&record).await;
529            }
530
531            // Flush any remaining buffered agent thought content
532            if !agent_thought_buffer.is_empty() {
533                let record = TraceRecord::new(
534                    &our_sid,
535                    TraceEventType::AgentThought,
536                    Contributor::new(&provider_clone, None),
537                )
538                .with_conversation(TraceConversation {
539                    turn: None,
540                    role: Some("assistant".to_string()),
541                    content_preview: Some(truncate_content(&agent_thought_buffer, 200)),
542                    full_content: Some(agent_thought_buffer.clone()),
543                });
544                let writer = TraceWriter::new(&cwd_clone);
545                let _ = writer.append_safe(&record).await;
546            }
547
548            alive_clone.store(false, Ordering::SeqCst);
549            tracing::info!("[AcpProcess:{}] stdout reader finished", name_clone);
550        });
551
552        // Wait briefly for process to stabilize
553        tokio::time::sleep(Duration::from_millis(300)).await;
554
555        if !alive.load(Ordering::SeqCst) {
556            return Err(format!("{} process died during startup", display_name));
557        }
558
559        tracing::info!("[AcpProcess:{}] Process started", display_name);
560
561        Ok(Self {
562            stdin,
563            child: Arc::new(Mutex::new(Some(child))),
564            pending,
565            next_id: Arc::new(AtomicU64::new(1)),
566            alive,
567            notification_tx,
568            display_name: display_name.to_string(),
569            command: command.to_string(),
570            _reader_handle: reader_handle,
571        })
572    }
573
574    /// Whether the process is still alive.
575    pub fn is_alive(&self) -> bool {
576        self.alive.load(Ordering::SeqCst)
577    }
578
579    /// Send a JSON-RPC request and wait for the response.
580    pub async fn send_request(
581        &self,
582        method: &str,
583        params: serde_json::Value,
584        timeout_ms: Option<u64>,
585    ) -> Result<serde_json::Value, String> {
586        if !self.is_alive() {
587            return Err(format!("{} process is not alive", self.display_name));
588        }
589
590        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
591        let (tx, rx) = oneshot::channel();
592
593        self.pending.lock().await.insert(id, tx);
594
595        let msg = serde_json::json!({
596            "jsonrpc": "2.0",
597            "id": id,
598            "method": method,
599            "params": params,
600        });
601        let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
602
603        {
604            let mut stdin = self.stdin.lock().await;
605            stdin
606                .write_all(data.as_bytes())
607                .await
608                .map_err(|e| format!("Write {}: {}", method, e))?;
609            stdin
610                .flush()
611                .await
612                .map_err(|e| format!("Flush {}: {}", method, e))?;
613        }
614
615        // Determine timeout based on method and command type
616        // npx/uvx agents may need longer timeout for first-time package download
617        let is_npx_or_uvx = self.command == "npx" || self.command == "uvx";
618        let default_timeout = match method {
619            "initialize" | "session/new" | "session/load" => {
620                if is_npx_or_uvx {
621                    120_000 // 2 min for npx/uvx (may need to download packages)
622                } else {
623                    15_000 // 15s for others
624                }
625            }
626            "session/prompt" => 300_000, // 5 min
627            _ => 30_000,
628        };
629        let timeout_dur = Duration::from_millis(timeout_ms.unwrap_or(default_timeout));
630
631        match tokio::time::timeout(timeout_dur, rx).await {
632            Ok(Ok(result)) => result,
633            Ok(Err(_)) => Err(format!("Channel closed for {} (id={})", method, id)),
634            Err(_) => {
635                self.pending.lock().await.remove(&id);
636                Err(format!(
637                    "Timeout waiting for {} (id={}, {}ms)",
638                    method,
639                    id,
640                    timeout_dur.as_millis()
641                ))
642            }
643        }
644    }
645
646    /// Initialize the ACP protocol.
647    pub async fn initialize(&self) -> Result<serde_json::Value, String> {
648        self.initialize_with_timeout(None).await
649    }
650
651    /// Initialize the ACP protocol with an optional timeout override.
652    pub async fn initialize_with_timeout(
653        &self,
654        timeout_ms: Option<u64>,
655    ) -> Result<serde_json::Value, String> {
656        let result = self
657            .send_request(
658                "initialize",
659                serde_json::json!({
660                    "protocolVersion": 1,
661                    "clientInfo": {
662                        "name": "routa-desktop",
663                        "version": "0.1.0"
664                    }
665                }),
666                timeout_ms,
667            )
668            .await?;
669        tracing::info!(
670            "[AcpProcess:{}] Initialized: {}",
671            self.display_name,
672            serde_json::to_string(&result).unwrap_or_default()
673        );
674        Ok(result)
675    }
676
677    /// Create a new ACP session. Returns the agent's session ID.
678    pub async fn new_session(
679        &self,
680        cwd: &str,
681        mcp_servers: &[serde_json::Value],
682    ) -> Result<String, String> {
683        let result = self
684            .send_request(
685                "session/new",
686                serde_json::json!({
687                    "cwd": cwd,
688                    "mcpServers": mcp_servers
689                }),
690                None,
691            )
692            .await?;
693
694        let session_id = result["sessionId"]
695            .as_str()
696            .ok_or_else(|| "No sessionId in session/new response".to_string())?
697            .to_string();
698
699        tracing::info!(
700            "[AcpProcess:{}] Session created: {}",
701            self.display_name,
702            session_id
703        );
704        Ok(session_id)
705    }
706
707    /// Load a persisted ACP session. Returns the agent's resumed session ID.
708    pub async fn load_session(
709        &self,
710        session_id: &str,
711        cwd: &str,
712        mcp_servers: &[serde_json::Value],
713    ) -> Result<String, String> {
714        let result = self
715            .send_request(
716                "session/load",
717                serde_json::json!({
718                    "sessionId": session_id,
719                    "cwd": cwd,
720                    "mcpServers": mcp_servers,
721                }),
722                None,
723            )
724            .await?;
725
726        let resumed_session_id = result["sessionId"]
727            .as_str()
728            .unwrap_or(session_id)
729            .to_string();
730
731        tracing::info!(
732            "[AcpProcess:{}] Session loaded: {}",
733            self.display_name,
734            resumed_session_id
735        );
736        Ok(resumed_session_id)
737    }
738
739    /// Send a prompt to an existing session. 5-minute timeout.
740    pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
741        self.send_request(
742            "session/prompt",
743            serde_json::json!({
744                "sessionId": session_id,
745                "prompt": [{ "type": "text", "text": text }]
746            }),
747            Some(300_000),
748        )
749        .await
750    }
751
752    /// Send session/cancel notification (no response expected).
753    pub async fn cancel(&self, session_id: &str) {
754        let msg = serde_json::json!({
755            "jsonrpc": "2.0",
756            "method": "session/cancel",
757            "params": { "sessionId": session_id }
758        });
759        let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
760        let mut stdin = self.stdin.lock().await;
761        let _ = stdin.write_all(data.as_bytes()).await;
762        let _ = stdin.flush().await;
763    }
764
765    /// Get the notification broadcast sender (for subscribing to SSE).
766    pub fn notification_sender(&self) -> &NotificationSender {
767        &self.notification_tx
768    }
769
770    /// Kill the agent process.
771    pub async fn kill(&self) {
772        self.alive.store(false, Ordering::SeqCst);
773        if let Some(mut child) = self.child.lock().await.take() {
774            tracing::info!("[AcpProcess:{}] Killing process", self.display_name);
775            let _ = child.kill().await;
776        }
777        // Reject all pending requests
778        let mut map = self.pending.lock().await;
779        for (_, tx) in map.drain() {
780            let _ = tx.send(Err("Process killed".to_string()));
781        }
782    }
783}
784
785/// Handle agent→client requests. Auto-approves permissions, handles fs ops.
786async fn handle_agent_request(
787    method: &str,
788    params: &serde_json::Value,
789    session_id: &str,
790    notification_tx: &NotificationSender,
791) -> serde_json::Value {
792    match method {
793        "session/request_permission" => {
794            tracing::info!("[AcpProcess] session/request_permission params={}", params);
795            build_permission_approval_result(params)
796        }
797        "fs/read_text_file" => {
798            let path = params["path"].as_str().unwrap_or("");
799            match tokio::fs::read_to_string(path).await {
800                Ok(content) => serde_json::json!({ "content": content }),
801                Err(e) => serde_json::json!({
802                    "error": format!("Failed to read file: {}", e)
803                }),
804            }
805        }
806        "fs/write_text_file" => {
807            let path = params["path"].as_str().unwrap_or("");
808            let content = params["content"].as_str().unwrap_or("");
809            if let Some(parent) = std::path::Path::new(path).parent() {
810                let _ = tokio::fs::create_dir_all(parent).await;
811            }
812            match tokio::fs::write(path, content).await {
813                Ok(_) => serde_json::json!({}),
814                Err(e) => serde_json::json!({
815                    "error": format!("Failed to write file: {}", e)
816                }),
817            }
818        }
819        "terminal/create" => {
820            match TerminalManager::global()
821                .create(params, session_id, notification_tx)
822                .await
823            {
824                Ok(result) => result,
825                Err(error) => serde_json::json!({ "error": error }),
826            }
827        }
828        "terminal/output" => {
829            let terminal_id = params["terminalId"].as_str().unwrap_or("");
830            match TerminalManager::global().get_output(terminal_id).await {
831                Ok(result) => result,
832                Err(error) => serde_json::json!({ "error": error }),
833            }
834        }
835        "terminal/wait_for_exit" => {
836            let terminal_id = params["terminalId"].as_str().unwrap_or("");
837            match TerminalManager::global().wait_for_exit(terminal_id).await {
838                Ok(result) => result,
839                Err(error) => serde_json::json!({ "error": error }),
840            }
841        }
842        "terminal/kill" => {
843            let terminal_id = params["terminalId"].as_str().unwrap_or("");
844            match TerminalManager::global().kill(terminal_id).await {
845                Ok(_) => serde_json::json!({}),
846                Err(error) => serde_json::json!({ "error": error }),
847            }
848        }
849        "terminal/release" => {
850            let terminal_id = params["terminalId"].as_str().unwrap_or("");
851            TerminalManager::global().release(terminal_id).await;
852            serde_json::json!({})
853        }
854        _ => {
855            tracing::warn!("[AcpProcess] Unknown agent request: {}", method);
856            serde_json::json!({})
857        }
858    }
859}
860
861/// Try to find and parse embedded JSON objects in a line.
862fn try_parse_embedded_json(line: &str) -> Option<serde_json::Value> {
863    let mut depth = 0i32;
864    let mut start = None;
865
866    for (i, ch) in line.char_indices() {
867        match ch {
868            '{' => {
869                if depth == 0 {
870                    start = Some(i);
871                }
872                depth += 1;
873            }
874            '}' => {
875                depth -= 1;
876                if depth == 0 {
877                    if let Some(s) = start {
878                        if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line[s..=i]) {
879                            return Some(v);
880                        }
881                    }
882                    start = None;
883                }
884            }
885            _ => {}
886        }
887    }
888    None
889}
890
891/// Build permission approval response following ACP spec and codex-acp expectations.
892/// This mirrors the Next.js implementation in acp-process.ts.
893fn build_permission_approval_result(params: &serde_json::Value) -> serde_json::Value {
894    // Default scope is "turn"
895    let scope = "turn";
896
897    // Try to resolve optionId from params.options array
898    if let Some(option_id) = resolve_permission_option_id(params, scope) {
899        return serde_json::json!({
900            "outcome": {
901                "outcome": "selected",
902                "optionId": option_id
903            }
904        });
905    }
906
907    // Fallback: return cancelled outcome
908    serde_json::json!({
909        "outcome": {
910            "outcome": "cancelled"
911        }
912    })
913}
914
915/// Resolve the appropriate permission optionId from the options array.
916/// Prefers "approved" or "approved-for-session" for turn/session scope.
917fn resolve_permission_option_id(params: &serde_json::Value, scope: &str) -> Option<String> {
918    let options = params.get("options")?.as_array()?;
919
920    // Define preferred option IDs based on scope
921    let preferred_ids = if scope == "session" {
922        vec![
923            "approved-for-session",
924            "approved-always",
925            "approved-execpolicy-amendment",
926            "approved",
927        ]
928    } else {
929        vec![
930            "approved",
931            "approved-once",
932            "approved-for-session",
933            "approved-always",
934            "approved-execpolicy-amendment",
935        ]
936    };
937
938    // Define preferred option kinds
939    let preferred_kinds = if scope == "session" {
940        vec!["allow_always", "allow_once"]
941    } else {
942        vec!["allow_once", "allow_always"]
943    };
944
945    // First, try to find by preferred optionId
946    for pref_id in &preferred_ids {
947        for option in options {
948            if let Some(option_id) = option.get("optionId").and_then(|v| v.as_str()) {
949                if option_id == *pref_id {
950                    return Some(option_id.to_string());
951                }
952            }
953        }
954    }
955
956    // Second, try to find by preferred kind
957    for pref_kind in &preferred_kinds {
958        for option in options {
959            if let Some(kind) = option.get("kind").and_then(|v| v.as_str()) {
960                if kind == *pref_kind {
961                    if let Some(option_id) = option.get("optionId").and_then(|v| v.as_str()) {
962                        return Some(option_id.to_string());
963                    }
964                }
965            }
966        }
967    }
968
969    // Fallback: use first available option or default
970    if let Some(first_option) = options.first() {
971        if let Some(option_id) = first_option.get("optionId").and_then(|v| v.as_str()) {
972            return Some(option_id.to_string());
973        }
974    }
975
976    // Last resort: return default based on scope
977    Some(if scope == "session" {
978        "approved-for-session".to_string()
979    } else {
980        "approved".to_string()
981    })
982}
983
984/// Safely truncate a string at a UTF-8 character boundary.
985/// Returns a substring of at most `max_bytes` bytes, but ensures it doesn't
986/// cut in the middle of a multi-byte UTF-8 character.
987fn truncate_content(s: &str, max_bytes: usize) -> String {
988    if s.len() <= max_bytes {
989        return s.to_string();
990    }
991
992    // Find the last valid UTF-8 character boundary before max_bytes
993    let mut end = max_bytes;
994    while end > 0 && !s.is_char_boundary(end) {
995        end -= 1;
996    }
997
998    s[..end].to_string()
999}
1000
1001fn should_ignore_process_stderr(command: &str, display_name: &str, line: &str) -> bool {
1002    is_codex_process(command, display_name) && is_codex_otel_stderr(line)
1003}
1004
1005fn is_codex_process(command: &str, display_name: &str) -> bool {
1006    let trimmed_command = command.trim();
1007    display_name.trim().eq_ignore_ascii_case("codex")
1008        || trimmed_command.eq_ignore_ascii_case("codex-acp")
1009        || trimmed_command.ends_with("/codex-acp")
1010}
1011
1012fn is_codex_otel_stderr(line: &str) -> bool {
1013    let trimmed = line.trim();
1014    trimmed.contains("codex_otel.log_only:") || trimmed.contains("codex_otel.trace_safe:")
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019    use super::{is_codex_otel_stderr, resolve_permission_option_id, should_ignore_process_stderr};
1020    use serde_json::json;
1021
1022    #[test]
1023    fn ignores_codex_otel_stderr_noise() {
1024        let line = "INFO ... codex_otel.log_only: event.kind=response.output_text.delta";
1025        assert!(should_ignore_process_stderr(
1026            "/opt/homebrew/bin/codex-acp",
1027            "Codex",
1028            line
1029        ));
1030        assert!(is_codex_otel_stderr(line));
1031    }
1032
1033    #[test]
1034    fn preserves_non_otel_codex_stderr() {
1035        let line = "ERROR codex_api::endpoint::responses_websocket: failed to connect";
1036        assert!(!should_ignore_process_stderr(
1037            "/opt/homebrew/bin/codex-acp",
1038            "Codex",
1039            line
1040        ));
1041    }
1042
1043    #[test]
1044    fn preserves_other_provider_stderr() {
1045        let line = "INFO ... codex_otel.log_only: event.kind=response.output_text.delta";
1046        assert!(!should_ignore_process_stderr(
1047            "/usr/bin/opencode",
1048            "OpenCode",
1049            line
1050        ));
1051    }
1052
1053    #[test]
1054    fn resolve_permission_option_id_prefers_turn_approval() {
1055        let params = json!({
1056            "options": [
1057                { "optionId": "denied", "kind": "deny_once" },
1058                { "optionId": "approved", "kind": "allow_once" }
1059            ]
1060        });
1061
1062        assert_eq!(
1063            resolve_permission_option_id(&params, "turn").as_deref(),
1064            Some("approved")
1065        );
1066    }
1067}