1use std::collections::HashMap;
14use std::io::ErrorKind;
15#[cfg(windows)]
16use std::os::windows::process::CommandExt;
17use std::path::Path;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::Duration;
21
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::process::{Child, ChildStdin};
24use tokio::sync::{broadcast, oneshot, Mutex};
25
26use super::terminal_manager::TerminalManager;
27#[cfg(windows)]
28use super::CREATE_NO_WINDOW;
29use crate::trace::{
30 Contributor, TraceConversation, TraceEventType, TraceRecord, TraceTool, TraceWriter,
31};
32
33pub type NotificationSender = broadcast::Sender<serde_json::Value>;
35
36type PendingMap = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<serde_json::Value, String>>>>>;
38
39pub struct AcpProcess {
41 stdin: Arc<Mutex<ChildStdin>>,
42 child: Arc<Mutex<Option<Child>>>,
43 pending: PendingMap,
44 next_id: Arc<AtomicU64>,
45 alive: Arc<AtomicBool>,
46 notification_tx: NotificationSender,
47 display_name: String,
48 command: String,
50 _reader_handle: tokio::task::JoinHandle<()>,
51}
52
53impl AcpProcess {
54 pub async fn spawn(
59 command: &str,
60 args: &[&str],
61 cwd: &str,
62 notification_tx: NotificationSender,
63 display_name: &str,
64 our_session_id: &str,
65 ) -> Result<Self, String> {
66 tracing::info!(
67 "[AcpProcess:{}] Spawning: {} {} (cwd: {})",
68 display_name,
69 command,
70 args.join(" "),
71 cwd,
72 );
73
74 let cwd_path = Path::new(cwd);
75 if !cwd_path.exists() {
76 return Err(format!(
77 "Invalid session cwd '{}': directory does not exist",
78 cwd
79 ));
80 }
81 if !cwd_path.is_dir() {
82 return Err(format!(
83 "Invalid session cwd '{}': path is not a directory",
84 cwd
85 ));
86 }
87
88 let resolved_command =
91 crate::shell_env::which(command).unwrap_or_else(|| command.to_string());
92
93 let mut command_builder = tokio::process::Command::new(&resolved_command);
94 command_builder
95 .args(args)
96 .current_dir(cwd)
97 .env("PATH", crate::shell_env::full_path())
98 .env("NODE_NO_READLINE", "1")
99 .stdin(std::process::Stdio::piped())
100 .stdout(std::process::Stdio::piped())
101 .stderr(std::process::Stdio::piped());
102
103 #[cfg(windows)]
104 command_builder
105 .as_std_mut()
106 .creation_flags(CREATE_NO_WINDOW);
107
108 if resolved_command.ends_with("codex-acp") && std::env::var_os("RUST_LOG").is_none() {
112 command_builder.env(
113 "RUST_LOG",
114 "info,codex_acp::thread=info,codex_acp::codex_agent=info",
115 );
116 }
117
118 let mut child = command_builder.spawn().map_err(|e| match e.kind() {
119 ErrorKind::NotFound => {
120 let resolved_exists = Path::new(&resolved_command).exists();
121 if resolved_exists {
122 format!(
123 "Failed to execute '{}' (resolved: '{}'): {}. The binary exists, but a required interpreter or wrapper target may be missing.",
124 command, resolved_command, e
125 )
126 } else {
127 format!(
128 "Failed to spawn '{}' (resolved: '{}'): {}. Is it installed and in PATH?",
129 command, resolved_command, e
130 )
131 }
132 }
133 _ => format!(
134 "Failed to spawn '{}' (resolved: '{}') from cwd '{}': {}",
135 command, resolved_command, cwd, e
136 ),
137 })?;
138
139 let stdin = child
140 .stdin
141 .take()
142 .ok_or_else(|| "No stdin on child process".to_string())?;
143 let stdout = child
144 .stdout
145 .take()
146 .ok_or_else(|| "No stdout on child process".to_string())?;
147 let stderr = child.stderr.take();
148
149 let alive = Arc::new(AtomicBool::new(true));
150 let pending: PendingMap = Arc::new(Mutex::new(HashMap::new()));
151 let stdin = Arc::new(Mutex::new(stdin));
152
153 let name = display_name.to_string();
154
155 if let Some(stderr) = stderr {
157 let name_clone = name.clone();
158 let ntx_stderr = notification_tx.clone();
159 let our_sid_stderr = our_session_id.to_string();
160 let resolved_command_stderr = resolved_command.clone();
161 tokio::spawn(async move {
162 let reader = BufReader::new(stderr);
163 let mut lines = reader.lines();
164 while let Ok(Some(line)) = lines.next_line().await {
165 if !line.trim().is_empty() {
166 if should_ignore_process_stderr(
167 &resolved_command_stderr,
168 &name_clone,
169 &line,
170 ) {
171 continue;
172 }
173 tracing::debug!("[AcpProcess:{} stderr] {}", name_clone, line);
174 let notification = serde_json::json!({
176 "jsonrpc": "2.0",
177 "method": "session/update",
178 "params": {
179 "sessionId": our_sid_stderr,
180 "update": {
181 "sessionUpdate": "process_output",
182 "source": "stderr",
183 "data": format!("{}\n", line),
184 "displayName": name_clone,
185 }
186 }
187 });
188 let _ = ntx_stderr.send(notification);
189 }
190 }
191 });
192 }
193
194 let alive_clone = alive.clone();
196 let pending_clone = pending.clone();
197 let ntx = notification_tx.clone();
198 let stdin_clone = stdin.clone();
199 let name_clone = name.clone();
200 let our_sid = our_session_id.to_string();
201 let cwd_clone = cwd.to_string();
202 let provider_clone = display_name.to_string();
203
204 let reader_handle = tokio::spawn(async move {
205 let reader = BufReader::new(stdout);
206 let mut lines = reader.lines();
207 let mut agent_msg_buffer = String::new();
209 let mut agent_thought_buffer = String::new();
211 let mut pending_tool_calls: std::collections::HashMap<String, (String, bool)> =
213 std::collections::HashMap::new();
214
215 while let Ok(Some(line)) = lines.next_line().await {
216 let line = line.trim().to_string();
217 if line.is_empty() {
218 continue;
219 }
220
221 let msg: serde_json::Value = match serde_json::from_str(&line) {
222 Ok(v) => v,
223 Err(_) => {
224 if let Some(v) = try_parse_embedded_json(&line) {
226 v
227 } else {
228 tracing::debug!(
229 "[AcpProcess:{}] Non-JSON stdout: {}",
230 name_clone,
231 truncate_content(&line, 200)
232 );
233 continue;
234 }
235 }
236 };
237
238 let has_id = msg.get("id").is_some() && !msg.get("id").unwrap().is_null();
239 let has_result = msg.get("result").is_some();
240 let has_error = msg.get("error").is_some();
241 let has_method = msg.get("method").and_then(|m| m.as_str()).is_some();
242
243 if has_id && (has_result || has_error) {
244 let id = msg["id"].as_u64().unwrap_or(0);
246 let mut map = pending_clone.lock().await;
247 if let Some(tx) = map.remove(&id) {
248 if has_error {
249 let err_msg =
250 msg["error"]["message"].as_str().unwrap_or("unknown error");
251 let err_code = msg["error"]["code"].as_i64().unwrap_or(0);
252 let _ = tx.send(Err(format!("ACP Error [{}]: {}", err_code, err_msg)));
253 } else {
254 let _ = tx.send(Ok(msg["result"].clone()));
255 }
256 }
257 } else if has_id && has_method {
258 let method = msg["method"].as_str().unwrap_or("");
260 let id_val = msg["id"].clone();
261 tracing::info!(
262 "[AcpProcess:{}] Agent request: {} (id={})",
263 name_clone,
264 method,
265 id_val
266 );
267 let response =
268 handle_agent_request(method, &msg["params"], &our_sid, &ntx).await;
269 let reply = serde_json::json!({
270 "jsonrpc": "2.0",
271 "id": id_val,
272 "result": response,
273 });
274 let data = format!("{}\n", serde_json::to_string(&reply).unwrap());
275 let mut stdin = stdin_clone.lock().await;
276 let _ = stdin.write_all(data.as_bytes()).await;
277 let _ = stdin.flush().await;
278 } else if has_method {
279 let mut rewritten = msg.clone();
283 if let Some(params) = rewritten.get_mut("params") {
284 if params.get("sessionId").is_some() {
285 params["sessionId"] = serde_json::Value::String(our_sid.clone());
286 }
287 }
288
289 if let Some(params) = msg.get("params") {
291 if let Some(update) = params.get("update") {
292 let session_update = update
293 .get("sessionUpdate")
294 .and_then(|v| v.as_str())
295 .unwrap_or("");
296
297 match session_update {
298 "agent_thought_chunk" => {
299 let text = update
301 .get("content")
302 .and_then(|c| c.get("text"))
303 .and_then(|t| t.as_str())
304 .unwrap_or("");
305 agent_thought_buffer.push_str(text);
306 if agent_thought_buffer.len() >= 100 {
308 let record = TraceRecord::new(
309 &our_sid,
310 TraceEventType::AgentThought,
311 Contributor::new(&provider_clone, None),
312 )
313 .with_conversation(TraceConversation {
314 turn: None,
315 role: Some("assistant".to_string()),
316 content_preview: Some(truncate_content(
317 &agent_thought_buffer,
318 200,
319 )),
320 full_content: Some(agent_thought_buffer.clone()),
321 });
322 let writer = TraceWriter::new(&cwd_clone);
323 let _ = writer.append_safe(&record).await;
324 agent_thought_buffer.clear();
325 }
326 }
327 "agent_message_chunk" => {
328 let text = update
330 .get("content")
331 .and_then(|c| c.get("text"))
332 .and_then(|t| t.as_str())
333 .unwrap_or("");
334 agent_msg_buffer.push_str(text);
335 if agent_msg_buffer.len() >= 100 {
337 let record = TraceRecord::new(
338 &our_sid,
339 TraceEventType::AgentMessage,
340 Contributor::new(&provider_clone, None),
341 )
342 .with_conversation(TraceConversation {
343 turn: None,
344 role: Some("assistant".to_string()),
345 content_preview: Some(truncate_content(
346 &agent_msg_buffer,
347 200,
348 )),
349 full_content: Some(agent_msg_buffer.clone()),
350 });
351 let writer = TraceWriter::new(&cwd_clone);
352 let _ = writer.append_safe(&record).await;
353 agent_msg_buffer.clear();
354 }
355 }
356 "agent_message" => {
357 let text = update
359 .get("content")
360 .and_then(|c| c.get("text"))
361 .and_then(|t| t.as_str())
362 .unwrap_or("");
363 let record = TraceRecord::new(
364 &our_sid,
365 TraceEventType::AgentMessage,
366 Contributor::new(&provider_clone, None),
367 )
368 .with_conversation(TraceConversation {
369 turn: None,
370 role: Some("assistant".to_string()),
371 content_preview: Some(truncate_content(text, 200)),
372 full_content: Some(text.to_string()),
373 });
374 let writer = TraceWriter::new(&cwd_clone);
375 let _ = writer.append_safe(&record).await;
376 }
377 "tool_call" => {
378 let tool_call_id =
380 update.get("toolCallId").and_then(|v| v.as_str());
381 let kind = update
382 .get("kind")
383 .and_then(|v| v.as_str())
384 .or_else(|| update.get("title").and_then(|v| v.as_str()))
385 .unwrap_or("unknown");
386 let raw_input = update.get("rawInput").cloned();
387
388 let has_input = raw_input.as_ref().is_some_and(|v| {
390 if let Some(obj) = v.as_object() {
391 !obj.is_empty()
392 } else {
393 !v.is_null()
394 }
395 });
396
397 if has_input {
398 let record = TraceRecord::new(
400 &our_sid,
401 TraceEventType::ToolCall,
402 Contributor::new(&provider_clone, None),
403 )
404 .with_tool(TraceTool {
405 name: kind.to_string(),
406 tool_call_id: tool_call_id.map(|s| s.to_string()),
407 status: Some("running".to_string()),
408 input: raw_input,
409 output: None,
410 });
411 let writer = TraceWriter::new(&cwd_clone);
412 let _ = writer.append_safe(&record).await;
413 } else if let Some(id) = tool_call_id {
414 pending_tool_calls
416 .insert(id.to_string(), (kind.to_string(), false));
417 }
418 }
419 "tool_call_update" => {
420 let tool_call_id =
422 update.get("toolCallId").and_then(|v| v.as_str());
423 let kind = update
424 .get("kind")
425 .and_then(|v| v.as_str())
426 .or_else(|| update.get("title").and_then(|v| v.as_str()))
427 .unwrap_or("unknown");
428 let raw_input = update.get("rawInput").cloned();
429 let raw_output = update
430 .get("rawOutput")
431 .and_then(|v| v.as_str())
432 .map(|s| serde_json::Value::String(s.to_string()))
433 .or_else(|| update.get("rawOutput").cloned());
434 let status = update
435 .get("status")
436 .and_then(|v| v.as_str())
437 .unwrap_or("completed");
438
439 let has_input = raw_input.as_ref().is_some_and(|v| {
441 if let Some(obj) = v.as_object() {
442 !obj.is_empty()
443 } else {
444 !v.is_null()
445 }
446 });
447
448 if let Some(id) = tool_call_id {
449 if let Some((stored_kind, traced)) =
450 pending_tool_calls.get_mut(id)
451 {
452 if has_input && !*traced {
453 let record = TraceRecord::new(
455 &our_sid,
456 TraceEventType::ToolCall,
457 Contributor::new(&provider_clone, None),
458 )
459 .with_tool(TraceTool {
460 name: stored_kind.clone(),
461 tool_call_id: Some(id.to_string()),
462 status: Some("running".to_string()),
463 input: raw_input.clone(),
464 output: None,
465 });
466 let writer = TraceWriter::new(&cwd_clone);
467 let _ = writer.append_safe(&record).await;
468 *traced = true;
469 }
470 }
471 }
472
473 let is_complete = status == "completed"
475 || status == "failed"
476 || raw_output.is_some();
477 if is_complete {
478 let record = TraceRecord::new(
479 &our_sid,
480 TraceEventType::ToolResult,
481 Contributor::new(&provider_clone, None),
482 )
483 .with_tool(TraceTool {
484 name: kind.to_string(),
485 tool_call_id: tool_call_id.map(|s| s.to_string()),
486 status: Some(status.to_string()),
487 input: None,
488 output: raw_output,
489 });
490 let writer = TraceWriter::new(&cwd_clone);
491 let _ = writer.append_safe(&record).await;
492
493 if let Some(id) = tool_call_id {
495 pending_tool_calls.remove(id);
496 }
497 }
498 }
499 _ => {}
500 }
501 }
502 }
503
504 let _ = ntx.send(rewritten);
505 } else {
506 tracing::debug!(
507 "[AcpProcess:{}] Unhandled message: {}",
508 name_clone,
509 truncate_content(&line, 200)
510 );
511 }
512 }
513
514 if !agent_msg_buffer.is_empty() {
516 let record = TraceRecord::new(
517 &our_sid,
518 TraceEventType::AgentMessage,
519 Contributor::new(&provider_clone, None),
520 )
521 .with_conversation(TraceConversation {
522 turn: None,
523 role: Some("assistant".to_string()),
524 content_preview: Some(truncate_content(&agent_msg_buffer, 200)),
525 full_content: Some(agent_msg_buffer.clone()),
526 });
527 let writer = TraceWriter::new(&cwd_clone);
528 let _ = writer.append_safe(&record).await;
529 }
530
531 if !agent_thought_buffer.is_empty() {
533 let record = TraceRecord::new(
534 &our_sid,
535 TraceEventType::AgentThought,
536 Contributor::new(&provider_clone, None),
537 )
538 .with_conversation(TraceConversation {
539 turn: None,
540 role: Some("assistant".to_string()),
541 content_preview: Some(truncate_content(&agent_thought_buffer, 200)),
542 full_content: Some(agent_thought_buffer.clone()),
543 });
544 let writer = TraceWriter::new(&cwd_clone);
545 let _ = writer.append_safe(&record).await;
546 }
547
548 alive_clone.store(false, Ordering::SeqCst);
549 tracing::info!("[AcpProcess:{}] stdout reader finished", name_clone);
550 });
551
552 tokio::time::sleep(Duration::from_millis(300)).await;
554
555 if !alive.load(Ordering::SeqCst) {
556 return Err(format!("{} process died during startup", display_name));
557 }
558
559 tracing::info!("[AcpProcess:{}] Process started", display_name);
560
561 Ok(Self {
562 stdin,
563 child: Arc::new(Mutex::new(Some(child))),
564 pending,
565 next_id: Arc::new(AtomicU64::new(1)),
566 alive,
567 notification_tx,
568 display_name: display_name.to_string(),
569 command: command.to_string(),
570 _reader_handle: reader_handle,
571 })
572 }
573
574 pub fn is_alive(&self) -> bool {
576 self.alive.load(Ordering::SeqCst)
577 }
578
579 pub async fn send_request(
581 &self,
582 method: &str,
583 params: serde_json::Value,
584 timeout_ms: Option<u64>,
585 ) -> Result<serde_json::Value, String> {
586 if !self.is_alive() {
587 return Err(format!("{} process is not alive", self.display_name));
588 }
589
590 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
591 let (tx, rx) = oneshot::channel();
592
593 self.pending.lock().await.insert(id, tx);
594
595 let msg = serde_json::json!({
596 "jsonrpc": "2.0",
597 "id": id,
598 "method": method,
599 "params": params,
600 });
601 let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
602
603 {
604 let mut stdin = self.stdin.lock().await;
605 stdin
606 .write_all(data.as_bytes())
607 .await
608 .map_err(|e| format!("Write {}: {}", method, e))?;
609 stdin
610 .flush()
611 .await
612 .map_err(|e| format!("Flush {}: {}", method, e))?;
613 }
614
615 let is_npx_or_uvx = self.command == "npx" || self.command == "uvx";
618 let default_timeout = match method {
619 "initialize" | "session/new" | "session/load" => {
620 if is_npx_or_uvx {
621 120_000 } else {
623 15_000 }
625 }
626 "session/prompt" => 300_000, _ => 30_000,
628 };
629 let timeout_dur = Duration::from_millis(timeout_ms.unwrap_or(default_timeout));
630
631 match tokio::time::timeout(timeout_dur, rx).await {
632 Ok(Ok(result)) => result,
633 Ok(Err(_)) => Err(format!("Channel closed for {} (id={})", method, id)),
634 Err(_) => {
635 self.pending.lock().await.remove(&id);
636 Err(format!(
637 "Timeout waiting for {} (id={}, {}ms)",
638 method,
639 id,
640 timeout_dur.as_millis()
641 ))
642 }
643 }
644 }
645
646 pub async fn initialize(&self) -> Result<serde_json::Value, String> {
648 self.initialize_with_timeout(None).await
649 }
650
651 pub async fn initialize_with_timeout(
653 &self,
654 timeout_ms: Option<u64>,
655 ) -> Result<serde_json::Value, String> {
656 let result = self
657 .send_request(
658 "initialize",
659 serde_json::json!({
660 "protocolVersion": 1,
661 "clientInfo": {
662 "name": "routa-desktop",
663 "version": "0.1.0"
664 }
665 }),
666 timeout_ms,
667 )
668 .await?;
669 tracing::info!(
670 "[AcpProcess:{}] Initialized: {}",
671 self.display_name,
672 serde_json::to_string(&result).unwrap_or_default()
673 );
674 Ok(result)
675 }
676
677 pub async fn new_session(
679 &self,
680 cwd: &str,
681 mcp_servers: &[serde_json::Value],
682 ) -> Result<String, String> {
683 let result = self
684 .send_request(
685 "session/new",
686 serde_json::json!({
687 "cwd": cwd,
688 "mcpServers": mcp_servers
689 }),
690 None,
691 )
692 .await?;
693
694 let session_id = result["sessionId"]
695 .as_str()
696 .ok_or_else(|| "No sessionId in session/new response".to_string())?
697 .to_string();
698
699 tracing::info!(
700 "[AcpProcess:{}] Session created: {}",
701 self.display_name,
702 session_id
703 );
704 Ok(session_id)
705 }
706
707 pub async fn load_session(
709 &self,
710 session_id: &str,
711 cwd: &str,
712 mcp_servers: &[serde_json::Value],
713 ) -> Result<String, String> {
714 let result = self
715 .send_request(
716 "session/load",
717 serde_json::json!({
718 "sessionId": session_id,
719 "cwd": cwd,
720 "mcpServers": mcp_servers,
721 }),
722 None,
723 )
724 .await?;
725
726 let resumed_session_id = result["sessionId"]
727 .as_str()
728 .unwrap_or(session_id)
729 .to_string();
730
731 tracing::info!(
732 "[AcpProcess:{}] Session loaded: {}",
733 self.display_name,
734 resumed_session_id
735 );
736 Ok(resumed_session_id)
737 }
738
739 pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
741 self.send_request(
742 "session/prompt",
743 serde_json::json!({
744 "sessionId": session_id,
745 "prompt": [{ "type": "text", "text": text }]
746 }),
747 Some(300_000),
748 )
749 .await
750 }
751
752 pub async fn cancel(&self, session_id: &str) {
754 let msg = serde_json::json!({
755 "jsonrpc": "2.0",
756 "method": "session/cancel",
757 "params": { "sessionId": session_id }
758 });
759 let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
760 let mut stdin = self.stdin.lock().await;
761 let _ = stdin.write_all(data.as_bytes()).await;
762 let _ = stdin.flush().await;
763 }
764
765 pub fn notification_sender(&self) -> &NotificationSender {
767 &self.notification_tx
768 }
769
770 pub async fn kill(&self) {
772 self.alive.store(false, Ordering::SeqCst);
773 if let Some(mut child) = self.child.lock().await.take() {
774 tracing::info!("[AcpProcess:{}] Killing process", self.display_name);
775 let _ = child.kill().await;
776 }
777 let mut map = self.pending.lock().await;
779 for (_, tx) in map.drain() {
780 let _ = tx.send(Err("Process killed".to_string()));
781 }
782 }
783}
784
785async fn handle_agent_request(
787 method: &str,
788 params: &serde_json::Value,
789 session_id: &str,
790 notification_tx: &NotificationSender,
791) -> serde_json::Value {
792 match method {
793 "session/request_permission" => {
794 tracing::info!("[AcpProcess] session/request_permission params={}", params);
795 build_permission_approval_result(params)
796 }
797 "fs/read_text_file" => {
798 let path = params["path"].as_str().unwrap_or("");
799 match tokio::fs::read_to_string(path).await {
800 Ok(content) => serde_json::json!({ "content": content }),
801 Err(e) => serde_json::json!({
802 "error": format!("Failed to read file: {}", e)
803 }),
804 }
805 }
806 "fs/write_text_file" => {
807 let path = params["path"].as_str().unwrap_or("");
808 let content = params["content"].as_str().unwrap_or("");
809 if let Some(parent) = std::path::Path::new(path).parent() {
810 let _ = tokio::fs::create_dir_all(parent).await;
811 }
812 match tokio::fs::write(path, content).await {
813 Ok(_) => serde_json::json!({}),
814 Err(e) => serde_json::json!({
815 "error": format!("Failed to write file: {}", e)
816 }),
817 }
818 }
819 "terminal/create" => {
820 match TerminalManager::global()
821 .create(params, session_id, notification_tx)
822 .await
823 {
824 Ok(result) => result,
825 Err(error) => serde_json::json!({ "error": error }),
826 }
827 }
828 "terminal/output" => {
829 let terminal_id = params["terminalId"].as_str().unwrap_or("");
830 match TerminalManager::global().get_output(terminal_id).await {
831 Ok(result) => result,
832 Err(error) => serde_json::json!({ "error": error }),
833 }
834 }
835 "terminal/wait_for_exit" => {
836 let terminal_id = params["terminalId"].as_str().unwrap_or("");
837 match TerminalManager::global().wait_for_exit(terminal_id).await {
838 Ok(result) => result,
839 Err(error) => serde_json::json!({ "error": error }),
840 }
841 }
842 "terminal/kill" => {
843 let terminal_id = params["terminalId"].as_str().unwrap_or("");
844 match TerminalManager::global().kill(terminal_id).await {
845 Ok(_) => serde_json::json!({}),
846 Err(error) => serde_json::json!({ "error": error }),
847 }
848 }
849 "terminal/release" => {
850 let terminal_id = params["terminalId"].as_str().unwrap_or("");
851 TerminalManager::global().release(terminal_id).await;
852 serde_json::json!({})
853 }
854 _ => {
855 tracing::warn!("[AcpProcess] Unknown agent request: {}", method);
856 serde_json::json!({})
857 }
858 }
859}
860
861fn try_parse_embedded_json(line: &str) -> Option<serde_json::Value> {
863 let mut depth = 0i32;
864 let mut start = None;
865
866 for (i, ch) in line.char_indices() {
867 match ch {
868 '{' => {
869 if depth == 0 {
870 start = Some(i);
871 }
872 depth += 1;
873 }
874 '}' => {
875 depth -= 1;
876 if depth == 0 {
877 if let Some(s) = start {
878 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line[s..=i]) {
879 return Some(v);
880 }
881 }
882 start = None;
883 }
884 }
885 _ => {}
886 }
887 }
888 None
889}
890
891fn build_permission_approval_result(params: &serde_json::Value) -> serde_json::Value {
894 let scope = "turn";
896
897 if let Some(option_id) = resolve_permission_option_id(params, scope) {
899 return serde_json::json!({
900 "outcome": {
901 "outcome": "selected",
902 "optionId": option_id
903 }
904 });
905 }
906
907 serde_json::json!({
909 "outcome": {
910 "outcome": "cancelled"
911 }
912 })
913}
914
915fn resolve_permission_option_id(params: &serde_json::Value, scope: &str) -> Option<String> {
918 let options = params.get("options")?.as_array()?;
919
920 let preferred_ids = if scope == "session" {
922 vec![
923 "approved-for-session",
924 "approved-always",
925 "approved-execpolicy-amendment",
926 "approved",
927 ]
928 } else {
929 vec![
930 "approved",
931 "approved-once",
932 "approved-for-session",
933 "approved-always",
934 "approved-execpolicy-amendment",
935 ]
936 };
937
938 let preferred_kinds = if scope == "session" {
940 vec!["allow_always", "allow_once"]
941 } else {
942 vec!["allow_once", "allow_always"]
943 };
944
945 for pref_id in &preferred_ids {
947 for option in options {
948 if let Some(option_id) = option.get("optionId").and_then(|v| v.as_str()) {
949 if option_id == *pref_id {
950 return Some(option_id.to_string());
951 }
952 }
953 }
954 }
955
956 for pref_kind in &preferred_kinds {
958 for option in options {
959 if let Some(kind) = option.get("kind").and_then(|v| v.as_str()) {
960 if kind == *pref_kind {
961 if let Some(option_id) = option.get("optionId").and_then(|v| v.as_str()) {
962 return Some(option_id.to_string());
963 }
964 }
965 }
966 }
967 }
968
969 if let Some(first_option) = options.first() {
971 if let Some(option_id) = first_option.get("optionId").and_then(|v| v.as_str()) {
972 return Some(option_id.to_string());
973 }
974 }
975
976 Some(if scope == "session" {
978 "approved-for-session".to_string()
979 } else {
980 "approved".to_string()
981 })
982}
983
984fn truncate_content(s: &str, max_bytes: usize) -> String {
988 if s.len() <= max_bytes {
989 return s.to_string();
990 }
991
992 let mut end = max_bytes;
994 while end > 0 && !s.is_char_boundary(end) {
995 end -= 1;
996 }
997
998 s[..end].to_string()
999}
1000
1001fn should_ignore_process_stderr(command: &str, display_name: &str, line: &str) -> bool {
1002 is_codex_process(command, display_name) && is_codex_otel_stderr(line)
1003}
1004
1005fn is_codex_process(command: &str, display_name: &str) -> bool {
1006 let trimmed_command = command.trim();
1007 display_name.trim().eq_ignore_ascii_case("codex")
1008 || trimmed_command.eq_ignore_ascii_case("codex-acp")
1009 || trimmed_command.ends_with("/codex-acp")
1010}
1011
1012fn is_codex_otel_stderr(line: &str) -> bool {
1013 let trimmed = line.trim();
1014 trimmed.contains("codex_otel.log_only:") || trimmed.contains("codex_otel.trace_safe:")
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019 use super::{is_codex_otel_stderr, resolve_permission_option_id, should_ignore_process_stderr};
1020 use serde_json::json;
1021
1022 #[test]
1023 fn ignores_codex_otel_stderr_noise() {
1024 let line = "INFO ... codex_otel.log_only: event.kind=response.output_text.delta";
1025 assert!(should_ignore_process_stderr(
1026 "/opt/homebrew/bin/codex-acp",
1027 "Codex",
1028 line
1029 ));
1030 assert!(is_codex_otel_stderr(line));
1031 }
1032
1033 #[test]
1034 fn preserves_non_otel_codex_stderr() {
1035 let line = "ERROR codex_api::endpoint::responses_websocket: failed to connect";
1036 assert!(!should_ignore_process_stderr(
1037 "/opt/homebrew/bin/codex-acp",
1038 "Codex",
1039 line
1040 ));
1041 }
1042
1043 #[test]
1044 fn preserves_other_provider_stderr() {
1045 let line = "INFO ... codex_otel.log_only: event.kind=response.output_text.delta";
1046 assert!(!should_ignore_process_stderr(
1047 "/usr/bin/opencode",
1048 "OpenCode",
1049 line
1050 ));
1051 }
1052
1053 #[test]
1054 fn resolve_permission_option_id_prefers_turn_approval() {
1055 let params = json!({
1056 "options": [
1057 { "optionId": "denied", "kind": "deny_once" },
1058 { "optionId": "approved", "kind": "allow_once" }
1059 ]
1060 });
1061
1062 assert_eq!(
1063 resolve_permission_option_id(¶ms, "turn").as_deref(),
1064 Some("approved")
1065 );
1066 }
1067}