1use std::collections::HashMap;
14use std::io::ErrorKind;
15#[cfg(windows)]
16use std::os::windows::process::CommandExt;
17use std::path::Path;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::Duration;
21
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::process::{Child, ChildStdin};
24use tokio::sync::{broadcast, oneshot, Mutex};
25
26use super::terminal_manager::TerminalManager;
27#[cfg(windows)]
28use super::CREATE_NO_WINDOW;
29use crate::trace::{
30 Contributor, TraceConversation, TraceEventType, TraceRecord, TraceTool, TraceWriter,
31};
32
33pub type NotificationSender = broadcast::Sender<serde_json::Value>;
35
36type PendingMap = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<serde_json::Value, String>>>>>;
38
39pub struct AcpProcess {
41 stdin: Arc<Mutex<ChildStdin>>,
42 child: Arc<Mutex<Option<Child>>>,
43 pending: PendingMap,
44 next_id: Arc<AtomicU64>,
45 alive: Arc<AtomicBool>,
46 notification_tx: NotificationSender,
47 display_name: String,
48 command: String,
50 _reader_handle: tokio::task::JoinHandle<()>,
51}
52
53impl AcpProcess {
54 pub async fn spawn(
59 command: &str,
60 args: &[&str],
61 cwd: &str,
62 notification_tx: NotificationSender,
63 display_name: &str,
64 our_session_id: &str,
65 ) -> Result<Self, String> {
66 tracing::info!(
67 "[AcpProcess:{}] Spawning: {} {} (cwd: {})",
68 display_name,
69 command,
70 args.join(" "),
71 cwd,
72 );
73
74 let cwd_path = Path::new(cwd);
75 if !cwd_path.exists() {
76 return Err(format!(
77 "Invalid session cwd '{cwd}': directory does not exist"
78 ));
79 }
80 if !cwd_path.is_dir() {
81 return Err(format!(
82 "Invalid session cwd '{cwd}': path is not a directory"
83 ));
84 }
85
86 let resolved_command =
89 crate::shell_env::which(command).unwrap_or_else(|| command.to_string());
90
91 let mut command_builder = tokio::process::Command::new(&resolved_command);
92 command_builder
93 .args(args)
94 .current_dir(cwd)
95 .env("PATH", crate::shell_env::full_path())
96 .env("NODE_NO_READLINE", "1")
97 .stdin(std::process::Stdio::piped())
98 .stdout(std::process::Stdio::piped())
99 .stderr(std::process::Stdio::piped());
100
101 #[cfg(windows)]
102 command_builder
103 .as_std_mut()
104 .creation_flags(CREATE_NO_WINDOW);
105
106 if resolved_command.ends_with("codex-acp") && std::env::var_os("RUST_LOG").is_none() {
110 command_builder.env(
111 "RUST_LOG",
112 "info,codex_acp::thread=info,codex_acp::codex_agent=info",
113 );
114 }
115
116 let mut child = command_builder.spawn().map_err(|e| match e.kind() {
117 ErrorKind::NotFound => {
118 let resolved_exists = Path::new(&resolved_command).exists();
119 if resolved_exists {
120 format!(
121 "Failed to execute '{command}' (resolved: '{resolved_command}'): {e}. The binary exists, but a required interpreter or wrapper target may be missing."
122 )
123 } else {
124 format!(
125 "Failed to spawn '{command}' (resolved: '{resolved_command}'): {e}. Is it installed and in PATH?"
126 )
127 }
128 }
129 _ => format!(
130 "Failed to spawn '{command}' (resolved: '{resolved_command}') from cwd '{cwd}': {e}"
131 ),
132 })?;
133
134 let stdin = child
135 .stdin
136 .take()
137 .ok_or_else(|| "No stdin on child process".to_string())?;
138 let stdout = child
139 .stdout
140 .take()
141 .ok_or_else(|| "No stdout on child process".to_string())?;
142 let stderr = child.stderr.take();
143
144 let alive = Arc::new(AtomicBool::new(true));
145 let pending: PendingMap = Arc::new(Mutex::new(HashMap::new()));
146 let stdin = Arc::new(Mutex::new(stdin));
147
148 let name = display_name.to_string();
149
150 if let Some(stderr) = stderr {
152 let name_clone = name.clone();
153 let ntx_stderr = notification_tx.clone();
154 let our_sid_stderr = our_session_id.to_string();
155 let resolved_command_stderr = resolved_command.clone();
156 tokio::spawn(async move {
157 let reader = BufReader::new(stderr);
158 let mut lines = reader.lines();
159 while let Ok(Some(line)) = lines.next_line().await {
160 if !line.trim().is_empty() {
161 if should_ignore_process_stderr(
162 &resolved_command_stderr,
163 &name_clone,
164 &line,
165 ) {
166 continue;
167 }
168 tracing::debug!("[AcpProcess:{} stderr] {}", name_clone, line);
169 let notification = serde_json::json!({
171 "jsonrpc": "2.0",
172 "method": "session/update",
173 "params": {
174 "sessionId": our_sid_stderr,
175 "update": {
176 "sessionUpdate": "process_output",
177 "source": "stderr",
178 "data": format!("{}\n", line),
179 "displayName": name_clone,
180 }
181 }
182 });
183 let _ = ntx_stderr.send(notification);
184 }
185 }
186 });
187 }
188
189 let alive_clone = alive.clone();
191 let pending_clone = pending.clone();
192 let ntx = notification_tx.clone();
193 let stdin_clone = stdin.clone();
194 let name_clone = name.clone();
195 let our_sid = our_session_id.to_string();
196 let cwd_clone = cwd.to_string();
197 let provider_clone = display_name.to_string();
198
199 let reader_handle = tokio::spawn(async move {
200 let reader = BufReader::new(stdout);
201 let mut lines = reader.lines();
202 let mut agent_msg_buffer = String::new();
204 let mut agent_thought_buffer = String::new();
206 let mut pending_tool_calls: std::collections::HashMap<String, (String, bool)> =
208 std::collections::HashMap::new();
209
210 while let Ok(Some(line)) = lines.next_line().await {
211 let line = line.trim().to_string();
212 if line.is_empty() {
213 continue;
214 }
215
216 let msg: serde_json::Value = match serde_json::from_str(&line) {
217 Ok(v) => v,
218 Err(_) => {
219 if let Some(v) = try_parse_embedded_json(&line) {
221 v
222 } else {
223 tracing::debug!(
224 "[AcpProcess:{}] Non-JSON stdout: {}",
225 name_clone,
226 truncate_content(&line, 200)
227 );
228 continue;
229 }
230 }
231 };
232
233 let has_id = msg.get("id").is_some() && !msg.get("id").unwrap().is_null();
234 let has_result = msg.get("result").is_some();
235 let has_error = msg.get("error").is_some();
236 let has_method = msg.get("method").and_then(|m| m.as_str()).is_some();
237
238 if has_id && (has_result || has_error) {
239 let id = msg["id"].as_u64().unwrap_or(0);
241 let mut map = pending_clone.lock().await;
242 if let Some(tx) = map.remove(&id) {
243 if has_error {
244 let err_msg =
245 msg["error"]["message"].as_str().unwrap_or("unknown error");
246 let err_code = msg["error"]["code"].as_i64().unwrap_or(0);
247 let _ = tx.send(Err(format!("ACP Error [{err_code}]: {err_msg}")));
248 } else {
249 let _ = tx.send(Ok(msg["result"].clone()));
250 }
251 }
252 } else if has_id && has_method {
253 let method = msg["method"].as_str().unwrap_or("");
255 let id_val = msg["id"].clone();
256 tracing::info!(
257 "[AcpProcess:{}] Agent request: {} (id={})",
258 name_clone,
259 method,
260 id_val
261 );
262 let response =
263 handle_agent_request(method, &msg["params"], &our_sid, &ntx).await;
264 let reply = serde_json::json!({
265 "jsonrpc": "2.0",
266 "id": id_val,
267 "result": response,
268 });
269 let data = format!("{}\n", serde_json::to_string(&reply).unwrap());
270 let mut stdin = stdin_clone.lock().await;
271 let _ = stdin.write_all(data.as_bytes()).await;
272 let _ = stdin.flush().await;
273 } else if has_method {
274 let mut rewritten = msg.clone();
278 if let Some(params) = rewritten.get_mut("params") {
279 if params.get("sessionId").is_some() {
280 params["sessionId"] = serde_json::Value::String(our_sid.clone());
281 }
282 }
283
284 if let Some(params) = msg.get("params") {
286 if let Some(update) = params.get("update") {
287 let session_update = update
288 .get("sessionUpdate")
289 .and_then(|v| v.as_str())
290 .unwrap_or("");
291
292 match session_update {
293 "agent_thought_chunk" => {
294 let text = update
296 .get("content")
297 .and_then(|c| c.get("text"))
298 .and_then(|t| t.as_str())
299 .unwrap_or("");
300 agent_thought_buffer.push_str(text);
301 if agent_thought_buffer.len() >= 100 {
303 let record = TraceRecord::new(
304 &our_sid,
305 TraceEventType::AgentThought,
306 Contributor::new(&provider_clone, None),
307 )
308 .with_conversation(TraceConversation {
309 turn: None,
310 role: Some("assistant".to_string()),
311 content_preview: Some(truncate_content(
312 &agent_thought_buffer,
313 200,
314 )),
315 full_content: Some(agent_thought_buffer.clone()),
316 });
317 let writer = TraceWriter::new(&cwd_clone);
318 let _ = writer.append_safe(&record).await;
319 agent_thought_buffer.clear();
320 }
321 }
322 "agent_message_chunk" => {
323 let text = update
325 .get("content")
326 .and_then(|c| c.get("text"))
327 .and_then(|t| t.as_str())
328 .unwrap_or("");
329 agent_msg_buffer.push_str(text);
330 if agent_msg_buffer.len() >= 100 {
332 let record = TraceRecord::new(
333 &our_sid,
334 TraceEventType::AgentMessage,
335 Contributor::new(&provider_clone, None),
336 )
337 .with_conversation(TraceConversation {
338 turn: None,
339 role: Some("assistant".to_string()),
340 content_preview: Some(truncate_content(
341 &agent_msg_buffer,
342 200,
343 )),
344 full_content: Some(agent_msg_buffer.clone()),
345 });
346 let writer = TraceWriter::new(&cwd_clone);
347 let _ = writer.append_safe(&record).await;
348 agent_msg_buffer.clear();
349 }
350 }
351 "agent_message" => {
352 let text = update
354 .get("content")
355 .and_then(|c| c.get("text"))
356 .and_then(|t| t.as_str())
357 .unwrap_or("");
358 let record = TraceRecord::new(
359 &our_sid,
360 TraceEventType::AgentMessage,
361 Contributor::new(&provider_clone, None),
362 )
363 .with_conversation(TraceConversation {
364 turn: None,
365 role: Some("assistant".to_string()),
366 content_preview: Some(truncate_content(text, 200)),
367 full_content: Some(text.to_string()),
368 });
369 let writer = TraceWriter::new(&cwd_clone);
370 let _ = writer.append_safe(&record).await;
371 }
372 "tool_call" => {
373 let tool_call_id =
375 update.get("toolCallId").and_then(|v| v.as_str());
376 let kind = update
377 .get("kind")
378 .and_then(|v| v.as_str())
379 .or_else(|| update.get("title").and_then(|v| v.as_str()))
380 .unwrap_or("unknown");
381 let raw_input = update.get("rawInput").cloned();
382
383 let has_input = raw_input.as_ref().is_some_and(|v| {
385 if let Some(obj) = v.as_object() {
386 !obj.is_empty()
387 } else {
388 !v.is_null()
389 }
390 });
391
392 if has_input {
393 let record = TraceRecord::new(
395 &our_sid,
396 TraceEventType::ToolCall,
397 Contributor::new(&provider_clone, None),
398 )
399 .with_tool(TraceTool {
400 name: kind.to_string(),
401 tool_call_id: tool_call_id.map(|s| s.to_string()),
402 status: Some("running".to_string()),
403 input: raw_input,
404 output: None,
405 });
406 let writer = TraceWriter::new(&cwd_clone);
407 let _ = writer.append_safe(&record).await;
408 } else if let Some(id) = tool_call_id {
409 pending_tool_calls
411 .insert(id.to_string(), (kind.to_string(), false));
412 }
413 }
414 "tool_call_update" => {
415 let tool_call_id =
417 update.get("toolCallId").and_then(|v| v.as_str());
418 let kind = update
419 .get("kind")
420 .and_then(|v| v.as_str())
421 .or_else(|| update.get("title").and_then(|v| v.as_str()))
422 .unwrap_or("unknown");
423 let raw_input = update.get("rawInput").cloned();
424 let raw_output = update
425 .get("rawOutput")
426 .and_then(|v| v.as_str())
427 .map(|s| serde_json::Value::String(s.to_string()))
428 .or_else(|| update.get("rawOutput").cloned());
429 let status = update
430 .get("status")
431 .and_then(|v| v.as_str())
432 .unwrap_or("completed");
433
434 let has_input = raw_input.as_ref().is_some_and(|v| {
436 if let Some(obj) = v.as_object() {
437 !obj.is_empty()
438 } else {
439 !v.is_null()
440 }
441 });
442
443 if let Some(id) = tool_call_id {
444 if let Some((stored_kind, traced)) =
445 pending_tool_calls.get_mut(id)
446 {
447 if has_input && !*traced {
448 let record = TraceRecord::new(
450 &our_sid,
451 TraceEventType::ToolCall,
452 Contributor::new(&provider_clone, None),
453 )
454 .with_tool(TraceTool {
455 name: stored_kind.clone(),
456 tool_call_id: Some(id.to_string()),
457 status: Some("running".to_string()),
458 input: raw_input.clone(),
459 output: None,
460 });
461 let writer = TraceWriter::new(&cwd_clone);
462 let _ = writer.append_safe(&record).await;
463 *traced = true;
464 }
465 }
466 }
467
468 let is_complete = status == "completed"
470 || status == "failed"
471 || raw_output.is_some();
472 if is_complete {
473 let record = TraceRecord::new(
474 &our_sid,
475 TraceEventType::ToolResult,
476 Contributor::new(&provider_clone, None),
477 )
478 .with_tool(TraceTool {
479 name: kind.to_string(),
480 tool_call_id: tool_call_id.map(|s| s.to_string()),
481 status: Some(status.to_string()),
482 input: None,
483 output: raw_output,
484 });
485 let writer = TraceWriter::new(&cwd_clone);
486 let _ = writer.append_safe(&record).await;
487
488 if let Some(id) = tool_call_id {
490 pending_tool_calls.remove(id);
491 }
492 }
493 }
494 _ => {}
495 }
496 }
497 }
498
499 let _ = ntx.send(rewritten);
500 } else {
501 tracing::debug!(
502 "[AcpProcess:{}] Unhandled message: {}",
503 name_clone,
504 truncate_content(&line, 200)
505 );
506 }
507 }
508
509 if !agent_msg_buffer.is_empty() {
511 let record = TraceRecord::new(
512 &our_sid,
513 TraceEventType::AgentMessage,
514 Contributor::new(&provider_clone, None),
515 )
516 .with_conversation(TraceConversation {
517 turn: None,
518 role: Some("assistant".to_string()),
519 content_preview: Some(truncate_content(&agent_msg_buffer, 200)),
520 full_content: Some(agent_msg_buffer.clone()),
521 });
522 let writer = TraceWriter::new(&cwd_clone);
523 let _ = writer.append_safe(&record).await;
524 }
525
526 if !agent_thought_buffer.is_empty() {
528 let record = TraceRecord::new(
529 &our_sid,
530 TraceEventType::AgentThought,
531 Contributor::new(&provider_clone, None),
532 )
533 .with_conversation(TraceConversation {
534 turn: None,
535 role: Some("assistant".to_string()),
536 content_preview: Some(truncate_content(&agent_thought_buffer, 200)),
537 full_content: Some(agent_thought_buffer.clone()),
538 });
539 let writer = TraceWriter::new(&cwd_clone);
540 let _ = writer.append_safe(&record).await;
541 }
542
543 alive_clone.store(false, Ordering::SeqCst);
544 tracing::info!("[AcpProcess:{}] stdout reader finished", name_clone);
545 });
546
547 tokio::time::sleep(Duration::from_millis(300)).await;
549
550 if !alive.load(Ordering::SeqCst) {
551 return Err(format!("{display_name} process died during startup"));
552 }
553
554 tracing::info!("[AcpProcess:{}] Process started", display_name);
555
556 Ok(Self {
557 stdin,
558 child: Arc::new(Mutex::new(Some(child))),
559 pending,
560 next_id: Arc::new(AtomicU64::new(1)),
561 alive,
562 notification_tx,
563 display_name: display_name.to_string(),
564 command: command.to_string(),
565 _reader_handle: reader_handle,
566 })
567 }
568
569 pub fn is_alive(&self) -> bool {
571 self.alive.load(Ordering::SeqCst)
572 }
573
574 pub async fn send_request(
576 &self,
577 method: &str,
578 params: serde_json::Value,
579 timeout_ms: Option<u64>,
580 ) -> Result<serde_json::Value, String> {
581 if !self.is_alive() {
582 return Err(format!("{} process is not alive", self.display_name));
583 }
584
585 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
586 let (tx, rx) = oneshot::channel();
587
588 self.pending.lock().await.insert(id, tx);
589
590 let msg = serde_json::json!({
591 "jsonrpc": "2.0",
592 "id": id,
593 "method": method,
594 "params": params,
595 });
596 let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
597
598 {
599 let mut stdin = self.stdin.lock().await;
600 stdin
601 .write_all(data.as_bytes())
602 .await
603 .map_err(|e| format!("Write {method}: {e}"))?;
604 stdin
605 .flush()
606 .await
607 .map_err(|e| format!("Flush {method}: {e}"))?;
608 }
609
610 let is_npx_or_uvx = self.command == "npx" || self.command == "uvx";
613 let default_timeout = match method {
614 "initialize" | "session/new" | "session/load" => {
615 if is_npx_or_uvx {
616 120_000 } else {
618 15_000 }
620 }
621 "session/prompt" => 300_000, _ => 30_000,
623 };
624 let timeout_dur = Duration::from_millis(timeout_ms.unwrap_or(default_timeout));
625
626 match tokio::time::timeout(timeout_dur, rx).await {
627 Ok(Ok(result)) => result,
628 Ok(Err(_)) => Err(format!("Channel closed for {method} (id={id})")),
629 Err(_) => {
630 self.pending.lock().await.remove(&id);
631 Err(format!(
632 "Timeout waiting for {} (id={}, {}ms)",
633 method,
634 id,
635 timeout_dur.as_millis()
636 ))
637 }
638 }
639 }
640
641 pub async fn initialize(&self) -> Result<serde_json::Value, String> {
643 self.initialize_with_timeout(None).await
644 }
645
646 pub async fn initialize_with_timeout(
648 &self,
649 timeout_ms: Option<u64>,
650 ) -> Result<serde_json::Value, String> {
651 let result = self
652 .send_request(
653 "initialize",
654 serde_json::json!({
655 "protocolVersion": 1,
656 "clientInfo": {
657 "name": "routa-desktop",
658 "version": "0.1.0"
659 }
660 }),
661 timeout_ms,
662 )
663 .await?;
664 tracing::info!(
665 "[AcpProcess:{}] Initialized: {}",
666 self.display_name,
667 serde_json::to_string(&result).unwrap_or_default()
668 );
669 Ok(result)
670 }
671
672 pub async fn new_session(
674 &self,
675 cwd: &str,
676 mcp_servers: &[serde_json::Value],
677 ) -> Result<String, String> {
678 let result = self
679 .send_request(
680 "session/new",
681 serde_json::json!({
682 "cwd": cwd,
683 "mcpServers": mcp_servers
684 }),
685 None,
686 )
687 .await?;
688
689 let session_id = result["sessionId"]
690 .as_str()
691 .ok_or_else(|| "No sessionId in session/new response".to_string())?
692 .to_string();
693
694 tracing::info!(
695 "[AcpProcess:{}] Session created: {}",
696 self.display_name,
697 session_id
698 );
699 Ok(session_id)
700 }
701
702 pub async fn load_session(
704 &self,
705 session_id: &str,
706 cwd: &str,
707 mcp_servers: &[serde_json::Value],
708 ) -> Result<String, String> {
709 let result = self
710 .send_request(
711 "session/load",
712 serde_json::json!({
713 "sessionId": session_id,
714 "cwd": cwd,
715 "mcpServers": mcp_servers,
716 }),
717 None,
718 )
719 .await?;
720
721 let resumed_session_id = result["sessionId"]
722 .as_str()
723 .unwrap_or(session_id)
724 .to_string();
725
726 tracing::info!(
727 "[AcpProcess:{}] Session loaded: {}",
728 self.display_name,
729 resumed_session_id
730 );
731 Ok(resumed_session_id)
732 }
733
734 pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
736 self.send_request(
737 "session/prompt",
738 serde_json::json!({
739 "sessionId": session_id,
740 "prompt": [{ "type": "text", "text": text }]
741 }),
742 Some(300_000),
743 )
744 .await
745 }
746
747 pub async fn cancel(&self, session_id: &str) {
749 let msg = serde_json::json!({
750 "jsonrpc": "2.0",
751 "method": "session/cancel",
752 "params": { "sessionId": session_id }
753 });
754 let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
755 let mut stdin = self.stdin.lock().await;
756 let _ = stdin.write_all(data.as_bytes()).await;
757 let _ = stdin.flush().await;
758 }
759
760 pub fn notification_sender(&self) -> &NotificationSender {
762 &self.notification_tx
763 }
764
765 pub async fn kill(&self) {
767 self.alive.store(false, Ordering::SeqCst);
768 if let Some(mut child) = self.child.lock().await.take() {
769 tracing::info!("[AcpProcess:{}] Killing process", self.display_name);
770 let _ = child.kill().await;
771 }
772 let mut map = self.pending.lock().await;
774 for (_, tx) in map.drain() {
775 let _ = tx.send(Err("Process killed".to_string()));
776 }
777 }
778}
779
780async fn handle_agent_request(
782 method: &str,
783 params: &serde_json::Value,
784 session_id: &str,
785 notification_tx: &NotificationSender,
786) -> serde_json::Value {
787 match method {
788 "session/request_permission" => {
789 tracing::info!("[AcpProcess] session/request_permission params={}", params);
790 build_permission_approval_result(params)
791 }
792 "fs/read_text_file" => {
793 let path = params["path"].as_str().unwrap_or("");
794 match tokio::fs::read_to_string(path).await {
795 Ok(content) => serde_json::json!({ "content": content }),
796 Err(e) => serde_json::json!({
797 "error": format!("Failed to read file: {}", e)
798 }),
799 }
800 }
801 "fs/write_text_file" => {
802 let path = params["path"].as_str().unwrap_or("");
803 let content = params["content"].as_str().unwrap_or("");
804 if let Some(parent) = std::path::Path::new(path).parent() {
805 let _ = tokio::fs::create_dir_all(parent).await;
806 }
807 match tokio::fs::write(path, content).await {
808 Ok(_) => serde_json::json!({}),
809 Err(e) => serde_json::json!({
810 "error": format!("Failed to write file: {}", e)
811 }),
812 }
813 }
814 "terminal/create" => {
815 match TerminalManager::global()
816 .create(params, session_id, notification_tx)
817 .await
818 {
819 Ok(result) => result,
820 Err(error) => serde_json::json!({ "error": error }),
821 }
822 }
823 "terminal/output" => {
824 let terminal_id = params["terminalId"].as_str().unwrap_or("");
825 match TerminalManager::global().get_output(terminal_id).await {
826 Ok(result) => result,
827 Err(error) => serde_json::json!({ "error": error }),
828 }
829 }
830 "terminal/wait_for_exit" => {
831 let terminal_id = params["terminalId"].as_str().unwrap_or("");
832 match TerminalManager::global().wait_for_exit(terminal_id).await {
833 Ok(result) => result,
834 Err(error) => serde_json::json!({ "error": error }),
835 }
836 }
837 "terminal/kill" => {
838 let terminal_id = params["terminalId"].as_str().unwrap_or("");
839 match TerminalManager::global().kill(terminal_id).await {
840 Ok(_) => serde_json::json!({}),
841 Err(error) => serde_json::json!({ "error": error }),
842 }
843 }
844 "terminal/release" => {
845 let terminal_id = params["terminalId"].as_str().unwrap_or("");
846 TerminalManager::global().release(terminal_id).await;
847 serde_json::json!({})
848 }
849 _ => {
850 tracing::warn!("[AcpProcess] Unknown agent request: {}", method);
851 serde_json::json!({})
852 }
853 }
854}
855
856fn try_parse_embedded_json(line: &str) -> Option<serde_json::Value> {
858 let mut depth = 0i32;
859 let mut start = None;
860
861 for (i, ch) in line.char_indices() {
862 match ch {
863 '{' => {
864 if depth == 0 {
865 start = Some(i);
866 }
867 depth += 1;
868 }
869 '}' => {
870 depth -= 1;
871 if depth == 0 {
872 if let Some(s) = start {
873 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line[s..=i]) {
874 return Some(v);
875 }
876 }
877 start = None;
878 }
879 }
880 _ => {}
881 }
882 }
883 None
884}
885
886fn build_permission_approval_result(params: &serde_json::Value) -> serde_json::Value {
889 let scope = "turn";
891
892 if let Some(option_id) = resolve_permission_option_id(params, scope) {
894 return serde_json::json!({
895 "outcome": {
896 "outcome": "selected",
897 "optionId": option_id
898 }
899 });
900 }
901
902 serde_json::json!({
904 "outcome": {
905 "outcome": "cancelled"
906 }
907 })
908}
909
910fn resolve_permission_option_id(params: &serde_json::Value, scope: &str) -> Option<String> {
913 let options = params.get("options")?.as_array()?;
914
915 let preferred_ids = if scope == "session" {
917 vec![
918 "approved-for-session",
919 "approved-always",
920 "approved-execpolicy-amendment",
921 "approved",
922 ]
923 } else {
924 vec![
925 "approved",
926 "approved-once",
927 "approved-for-session",
928 "approved-always",
929 "approved-execpolicy-amendment",
930 ]
931 };
932
933 let preferred_kinds = if scope == "session" {
935 vec!["allow_always", "allow_once"]
936 } else {
937 vec!["allow_once", "allow_always"]
938 };
939
940 for pref_id in &preferred_ids {
942 for option in options {
943 if let Some(option_id) = option.get("optionId").and_then(|v| v.as_str()) {
944 if option_id == *pref_id {
945 return Some(option_id.to_string());
946 }
947 }
948 }
949 }
950
951 for pref_kind in &preferred_kinds {
953 for option in options {
954 if let Some(kind) = option.get("kind").and_then(|v| v.as_str()) {
955 if kind == *pref_kind {
956 if let Some(option_id) = option.get("optionId").and_then(|v| v.as_str()) {
957 return Some(option_id.to_string());
958 }
959 }
960 }
961 }
962 }
963
964 if let Some(first_option) = options.first() {
966 if let Some(option_id) = first_option.get("optionId").and_then(|v| v.as_str()) {
967 return Some(option_id.to_string());
968 }
969 }
970
971 Some(if scope == "session" {
973 "approved-for-session".to_string()
974 } else {
975 "approved".to_string()
976 })
977}
978
979fn truncate_content(s: &str, max_bytes: usize) -> String {
983 if s.len() <= max_bytes {
984 return s.to_string();
985 }
986
987 let mut end = max_bytes;
989 while end > 0 && !s.is_char_boundary(end) {
990 end -= 1;
991 }
992
993 s[..end].to_string()
994}
995
996fn should_ignore_process_stderr(command: &str, display_name: &str, line: &str) -> bool {
997 is_codex_process(command, display_name) && is_codex_otel_stderr(line)
998}
999
1000fn is_codex_process(command: &str, display_name: &str) -> bool {
1001 let trimmed_command = command.trim();
1002 display_name.trim().eq_ignore_ascii_case("codex")
1003 || trimmed_command.eq_ignore_ascii_case("codex-acp")
1004 || trimmed_command.ends_with("/codex-acp")
1005}
1006
1007fn is_codex_otel_stderr(line: &str) -> bool {
1008 let trimmed = line.trim();
1009 trimmed.contains("codex_otel.log_only:") || trimmed.contains("codex_otel.trace_safe:")
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014 use super::{is_codex_otel_stderr, resolve_permission_option_id, should_ignore_process_stderr};
1015 use serde_json::json;
1016
1017 #[test]
1018 fn ignores_codex_otel_stderr_noise() {
1019 let line = "INFO ... codex_otel.log_only: event.kind=response.output_text.delta";
1020 assert!(should_ignore_process_stderr(
1021 "/opt/homebrew/bin/codex-acp",
1022 "Codex",
1023 line
1024 ));
1025 assert!(is_codex_otel_stderr(line));
1026 }
1027
1028 #[test]
1029 fn preserves_non_otel_codex_stderr() {
1030 let line = "ERROR codex_api::endpoint::responses_websocket: failed to connect";
1031 assert!(!should_ignore_process_stderr(
1032 "/opt/homebrew/bin/codex-acp",
1033 "Codex",
1034 line
1035 ));
1036 }
1037
1038 #[test]
1039 fn preserves_other_provider_stderr() {
1040 let line = "INFO ... codex_otel.log_only: event.kind=response.output_text.delta";
1041 assert!(!should_ignore_process_stderr(
1042 "/usr/bin/opencode",
1043 "OpenCode",
1044 line
1045 ));
1046 }
1047
1048 #[test]
1049 fn resolve_permission_option_id_prefers_turn_approval() {
1050 let params = json!({
1051 "options": [
1052 { "optionId": "denied", "kind": "deny_once" },
1053 { "optionId": "approved", "kind": "allow_once" }
1054 ]
1055 });
1056
1057 assert_eq!(
1058 resolve_permission_option_id(¶ms, "turn").as_deref(),
1059 Some("approved")
1060 );
1061 }
1062}