1use std::collections::HashMap;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
19use tokio::process::{Child, ChildStdin};
20use tokio::sync::{broadcast, oneshot, Mutex};
21
22use super::terminal_manager::TerminalManager;
23use crate::trace::{
24 Contributor, TraceConversation, TraceEventType, TraceRecord, TraceTool, TraceWriter,
25};
26
27pub type NotificationSender = broadcast::Sender<serde_json::Value>;
29
30type PendingMap = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<serde_json::Value, String>>>>>;
32
33pub struct AcpProcess {
35 stdin: Arc<Mutex<ChildStdin>>,
36 child: Arc<Mutex<Option<Child>>>,
37 pending: PendingMap,
38 next_id: Arc<AtomicU64>,
39 alive: Arc<AtomicBool>,
40 notification_tx: NotificationSender,
41 display_name: String,
42 command: String,
44 _reader_handle: tokio::task::JoinHandle<()>,
45}
46
47impl AcpProcess {
48 pub async fn spawn(
53 command: &str,
54 args: &[&str],
55 cwd: &str,
56 notification_tx: NotificationSender,
57 display_name: &str,
58 our_session_id: &str,
59 ) -> Result<Self, String> {
60 tracing::info!(
61 "[AcpProcess:{}] Spawning: {} {} (cwd: {})",
62 display_name,
63 command,
64 args.join(" "),
65 cwd,
66 );
67
68 let resolved_command =
71 crate::shell_env::which(command).unwrap_or_else(|| command.to_string());
72
73 let mut command_builder = tokio::process::Command::new(&resolved_command);
74 command_builder
75 .args(args)
76 .current_dir(cwd)
77 .env("PATH", crate::shell_env::full_path())
78 .env("NODE_NO_READLINE", "1")
79 .stdin(std::process::Stdio::piped())
80 .stdout(std::process::Stdio::piped())
81 .stderr(std::process::Stdio::piped());
82
83 if resolved_command.ends_with("codex-acp") && std::env::var_os("RUST_LOG").is_none() {
87 command_builder.env(
88 "RUST_LOG",
89 "info,codex_acp::thread=info,codex_acp::codex_agent=info",
90 );
91 }
92
93 let mut child = command_builder.spawn().map_err(|e| {
94 format!(
95 "Failed to spawn '{}' (resolved: '{}'): {}. Is it installed and in PATH?",
96 command, resolved_command, e
97 )
98 })?;
99
100 let stdin = child
101 .stdin
102 .take()
103 .ok_or_else(|| "No stdin on child process".to_string())?;
104 let stdout = child
105 .stdout
106 .take()
107 .ok_or_else(|| "No stdout on child process".to_string())?;
108 let stderr = child.stderr.take();
109
110 let alive = Arc::new(AtomicBool::new(true));
111 let pending: PendingMap = Arc::new(Mutex::new(HashMap::new()));
112 let stdin = Arc::new(Mutex::new(stdin));
113
114 let name = display_name.to_string();
115
116 if let Some(stderr) = stderr {
118 let name_clone = name.clone();
119 let ntx_stderr = notification_tx.clone();
120 let our_sid_stderr = our_session_id.to_string();
121 tokio::spawn(async move {
122 let reader = BufReader::new(stderr);
123 let mut lines = reader.lines();
124 while let Ok(Some(line)) = lines.next_line().await {
125 if !line.trim().is_empty() {
126 tracing::debug!("[AcpProcess:{} stderr] {}", name_clone, line);
127 let notification = serde_json::json!({
129 "jsonrpc": "2.0",
130 "method": "session/update",
131 "params": {
132 "sessionId": our_sid_stderr,
133 "update": {
134 "sessionUpdate": "process_output",
135 "source": "stderr",
136 "data": format!("{}\n", line),
137 "displayName": name_clone,
138 }
139 }
140 });
141 let _ = ntx_stderr.send(notification);
142 }
143 }
144 });
145 }
146
147 let alive_clone = alive.clone();
149 let pending_clone = pending.clone();
150 let ntx = notification_tx.clone();
151 let stdin_clone = stdin.clone();
152 let name_clone = name.clone();
153 let our_sid = our_session_id.to_string();
154 let cwd_clone = cwd.to_string();
155 let provider_clone = display_name.to_string();
156
157 let reader_handle = tokio::spawn(async move {
158 let reader = BufReader::new(stdout);
159 let mut lines = reader.lines();
160 let mut agent_msg_buffer = String::new();
162 let mut agent_thought_buffer = String::new();
164 let mut pending_tool_calls: std::collections::HashMap<String, (String, bool)> =
166 std::collections::HashMap::new();
167
168 while let Ok(Some(line)) = lines.next_line().await {
169 let line = line.trim().to_string();
170 if line.is_empty() {
171 continue;
172 }
173
174 let msg: serde_json::Value = match serde_json::from_str(&line) {
175 Ok(v) => v,
176 Err(_) => {
177 if let Some(v) = try_parse_embedded_json(&line) {
179 v
180 } else {
181 tracing::debug!(
182 "[AcpProcess:{}] Non-JSON stdout: {}",
183 name_clone,
184 &line[..line.len().min(200)]
185 );
186 continue;
187 }
188 }
189 };
190
191 let has_id = msg.get("id").is_some() && !msg.get("id").unwrap().is_null();
192 let has_result = msg.get("result").is_some();
193 let has_error = msg.get("error").is_some();
194 let has_method = msg.get("method").and_then(|m| m.as_str()).is_some();
195
196 if has_id && (has_result || has_error) {
197 let id = msg["id"].as_u64().unwrap_or(0);
199 let mut map = pending_clone.lock().await;
200 if let Some(tx) = map.remove(&id) {
201 if has_error {
202 let err_msg =
203 msg["error"]["message"].as_str().unwrap_or("unknown error");
204 let err_code = msg["error"]["code"].as_i64().unwrap_or(0);
205 let _ = tx.send(Err(format!("ACP Error [{}]: {}", err_code, err_msg)));
206 } else {
207 let _ = tx.send(Ok(msg["result"].clone()));
208 }
209 }
210 } else if has_id && has_method {
211 let method = msg["method"].as_str().unwrap_or("");
213 let id_val = msg["id"].clone();
214 tracing::info!(
215 "[AcpProcess:{}] Agent request: {} (id={})",
216 name_clone,
217 method,
218 id_val
219 );
220 let response =
221 handle_agent_request(method, &msg["params"], &our_sid, &ntx).await;
222 let reply = serde_json::json!({
223 "jsonrpc": "2.0",
224 "id": id_val,
225 "result": response,
226 });
227 let data = format!("{}\n", serde_json::to_string(&reply).unwrap());
228 let mut stdin = stdin_clone.lock().await;
229 let _ = stdin.write_all(data.as_bytes()).await;
230 let _ = stdin.flush().await;
231 } else if has_method {
232 let mut rewritten = msg.clone();
236 if let Some(params) = rewritten.get_mut("params") {
237 if params.get("sessionId").is_some() {
238 params["sessionId"] = serde_json::Value::String(our_sid.clone());
239 }
240 }
241
242 if let Some(params) = msg.get("params") {
244 if let Some(update) = params.get("update") {
245 let session_update = update
246 .get("sessionUpdate")
247 .and_then(|v| v.as_str())
248 .unwrap_or("");
249
250 match session_update {
251 "agent_thought_chunk" => {
252 let text = update
254 .get("content")
255 .and_then(|c| c.get("text"))
256 .and_then(|t| t.as_str())
257 .unwrap_or("");
258 agent_thought_buffer.push_str(text);
259 if agent_thought_buffer.len() >= 100 {
261 let record = TraceRecord::new(
262 &our_sid,
263 TraceEventType::AgentThought,
264 Contributor::new(&provider_clone, None),
265 )
266 .with_conversation(TraceConversation {
267 turn: None,
268 role: Some("assistant".to_string()),
269 content_preview: Some(
270 agent_thought_buffer
271 [..agent_thought_buffer.len().min(200)]
272 .to_string(),
273 ),
274 full_content: Some(agent_thought_buffer.clone()),
275 });
276 let writer = TraceWriter::new(&cwd_clone);
277 let _ = writer.append_safe(&record).await;
278 agent_thought_buffer.clear();
279 }
280 }
281 "agent_message_chunk" => {
282 let text = update
284 .get("content")
285 .and_then(|c| c.get("text"))
286 .and_then(|t| t.as_str())
287 .unwrap_or("");
288 agent_msg_buffer.push_str(text);
289 if agent_msg_buffer.len() >= 100 {
291 let record = TraceRecord::new(
292 &our_sid,
293 TraceEventType::AgentMessage,
294 Contributor::new(&provider_clone, None),
295 )
296 .with_conversation(TraceConversation {
297 turn: None,
298 role: Some("assistant".to_string()),
299 content_preview: Some(
300 agent_msg_buffer[..agent_msg_buffer.len().min(200)]
301 .to_string(),
302 ),
303 full_content: Some(agent_msg_buffer.clone()),
304 });
305 let writer = TraceWriter::new(&cwd_clone);
306 let _ = writer.append_safe(&record).await;
307 agent_msg_buffer.clear();
308 }
309 }
310 "agent_message" => {
311 let text = update
313 .get("content")
314 .and_then(|c| c.get("text"))
315 .and_then(|t| t.as_str())
316 .unwrap_or("");
317 let record = TraceRecord::new(
318 &our_sid,
319 TraceEventType::AgentMessage,
320 Contributor::new(&provider_clone, None),
321 )
322 .with_conversation(TraceConversation {
323 turn: None,
324 role: Some("assistant".to_string()),
325 content_preview: Some(
326 text[..text.len().min(200)].to_string(),
327 ),
328 full_content: Some(text.to_string()),
329 });
330 let writer = TraceWriter::new(&cwd_clone);
331 let _ = writer.append_safe(&record).await;
332 }
333 "tool_call" => {
334 let tool_call_id =
336 update.get("toolCallId").and_then(|v| v.as_str());
337 let kind = update
338 .get("kind")
339 .and_then(|v| v.as_str())
340 .or_else(|| update.get("title").and_then(|v| v.as_str()))
341 .unwrap_or("unknown");
342 let raw_input = update.get("rawInput").cloned();
343
344 let has_input = raw_input.as_ref().is_some_and(|v| {
346 if let Some(obj) = v.as_object() {
347 !obj.is_empty()
348 } else {
349 !v.is_null()
350 }
351 });
352
353 if has_input {
354 let record = TraceRecord::new(
356 &our_sid,
357 TraceEventType::ToolCall,
358 Contributor::new(&provider_clone, None),
359 )
360 .with_tool(TraceTool {
361 name: kind.to_string(),
362 tool_call_id: tool_call_id.map(|s| s.to_string()),
363 status: Some("running".to_string()),
364 input: raw_input,
365 output: None,
366 });
367 let writer = TraceWriter::new(&cwd_clone);
368 let _ = writer.append_safe(&record).await;
369 } else if let Some(id) = tool_call_id {
370 pending_tool_calls
372 .insert(id.to_string(), (kind.to_string(), false));
373 }
374 }
375 "tool_call_update" => {
376 let tool_call_id =
378 update.get("toolCallId").and_then(|v| v.as_str());
379 let kind = update
380 .get("kind")
381 .and_then(|v| v.as_str())
382 .or_else(|| update.get("title").and_then(|v| v.as_str()))
383 .unwrap_or("unknown");
384 let raw_input = update.get("rawInput").cloned();
385 let raw_output = update
386 .get("rawOutput")
387 .and_then(|v| v.as_str())
388 .map(|s| serde_json::Value::String(s.to_string()))
389 .or_else(|| update.get("rawOutput").cloned());
390 let status = update
391 .get("status")
392 .and_then(|v| v.as_str())
393 .unwrap_or("completed");
394
395 let has_input = raw_input.as_ref().is_some_and(|v| {
397 if let Some(obj) = v.as_object() {
398 !obj.is_empty()
399 } else {
400 !v.is_null()
401 }
402 });
403
404 if let Some(id) = tool_call_id {
405 if let Some((stored_kind, traced)) =
406 pending_tool_calls.get_mut(id)
407 {
408 if has_input && !*traced {
409 let record = TraceRecord::new(
411 &our_sid,
412 TraceEventType::ToolCall,
413 Contributor::new(&provider_clone, None),
414 )
415 .with_tool(TraceTool {
416 name: stored_kind.clone(),
417 tool_call_id: Some(id.to_string()),
418 status: Some("running".to_string()),
419 input: raw_input.clone(),
420 output: None,
421 });
422 let writer = TraceWriter::new(&cwd_clone);
423 let _ = writer.append_safe(&record).await;
424 *traced = true;
425 }
426 }
427 }
428
429 let is_complete = status == "completed"
431 || status == "failed"
432 || raw_output.is_some();
433 if is_complete {
434 let record = TraceRecord::new(
435 &our_sid,
436 TraceEventType::ToolResult,
437 Contributor::new(&provider_clone, None),
438 )
439 .with_tool(TraceTool {
440 name: kind.to_string(),
441 tool_call_id: tool_call_id.map(|s| s.to_string()),
442 status: Some(status.to_string()),
443 input: None,
444 output: raw_output,
445 });
446 let writer = TraceWriter::new(&cwd_clone);
447 let _ = writer.append_safe(&record).await;
448
449 if let Some(id) = tool_call_id {
451 pending_tool_calls.remove(id);
452 }
453 }
454 }
455 _ => {}
456 }
457 }
458 }
459
460 let _ = ntx.send(rewritten);
461 } else {
462 tracing::debug!(
463 "[AcpProcess:{}] Unhandled message: {}",
464 name_clone,
465 &line[..line.len().min(200)]
466 );
467 }
468 }
469
470 if !agent_msg_buffer.is_empty() {
472 let record = TraceRecord::new(
473 &our_sid,
474 TraceEventType::AgentMessage,
475 Contributor::new(&provider_clone, None),
476 )
477 .with_conversation(TraceConversation {
478 turn: None,
479 role: Some("assistant".to_string()),
480 content_preview: Some(
481 agent_msg_buffer[..agent_msg_buffer.len().min(200)].to_string(),
482 ),
483 full_content: Some(agent_msg_buffer.clone()),
484 });
485 let writer = TraceWriter::new(&cwd_clone);
486 let _ = writer.append_safe(&record).await;
487 }
488
489 if !agent_thought_buffer.is_empty() {
491 let record = TraceRecord::new(
492 &our_sid,
493 TraceEventType::AgentThought,
494 Contributor::new(&provider_clone, None),
495 )
496 .with_conversation(TraceConversation {
497 turn: None,
498 role: Some("assistant".to_string()),
499 content_preview: Some(
500 agent_thought_buffer[..agent_thought_buffer.len().min(200)].to_string(),
501 ),
502 full_content: Some(agent_thought_buffer.clone()),
503 });
504 let writer = TraceWriter::new(&cwd_clone);
505 let _ = writer.append_safe(&record).await;
506 }
507
508 alive_clone.store(false, Ordering::SeqCst);
509 tracing::info!("[AcpProcess:{}] stdout reader finished", name_clone);
510 });
511
512 tokio::time::sleep(Duration::from_millis(300)).await;
514
515 if !alive.load(Ordering::SeqCst) {
516 return Err(format!("{} process died during startup", display_name));
517 }
518
519 tracing::info!("[AcpProcess:{}] Process started", display_name);
520
521 Ok(Self {
522 stdin,
523 child: Arc::new(Mutex::new(Some(child))),
524 pending,
525 next_id: Arc::new(AtomicU64::new(1)),
526 alive,
527 notification_tx,
528 display_name: display_name.to_string(),
529 command: command.to_string(),
530 _reader_handle: reader_handle,
531 })
532 }
533
534 pub fn is_alive(&self) -> bool {
536 self.alive.load(Ordering::SeqCst)
537 }
538
539 pub async fn send_request(
541 &self,
542 method: &str,
543 params: serde_json::Value,
544 timeout_ms: Option<u64>,
545 ) -> Result<serde_json::Value, String> {
546 if !self.is_alive() {
547 return Err(format!("{} process is not alive", self.display_name));
548 }
549
550 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
551 let (tx, rx) = oneshot::channel();
552
553 self.pending.lock().await.insert(id, tx);
554
555 let msg = serde_json::json!({
556 "jsonrpc": "2.0",
557 "id": id,
558 "method": method,
559 "params": params,
560 });
561 let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
562
563 {
564 let mut stdin = self.stdin.lock().await;
565 stdin
566 .write_all(data.as_bytes())
567 .await
568 .map_err(|e| format!("Write {}: {}", method, e))?;
569 stdin
570 .flush()
571 .await
572 .map_err(|e| format!("Flush {}: {}", method, e))?;
573 }
574
575 let is_npx_or_uvx = self.command == "npx" || self.command == "uvx";
578 let default_timeout = match method {
579 "initialize" | "session/new" => {
580 if is_npx_or_uvx {
581 120_000 } else {
583 15_000 }
585 }
586 "session/prompt" => 300_000, _ => 30_000,
588 };
589 let timeout_dur = Duration::from_millis(timeout_ms.unwrap_or(default_timeout));
590
591 match tokio::time::timeout(timeout_dur, rx).await {
592 Ok(Ok(result)) => result,
593 Ok(Err(_)) => Err(format!("Channel closed for {} (id={})", method, id)),
594 Err(_) => {
595 self.pending.lock().await.remove(&id);
596 Err(format!(
597 "Timeout waiting for {} (id={}, {}ms)",
598 method,
599 id,
600 timeout_dur.as_millis()
601 ))
602 }
603 }
604 }
605
606 pub async fn initialize(&self) -> Result<serde_json::Value, String> {
608 self.initialize_with_timeout(None).await
609 }
610
611 pub async fn initialize_with_timeout(
613 &self,
614 timeout_ms: Option<u64>,
615 ) -> Result<serde_json::Value, String> {
616 let result = self
617 .send_request(
618 "initialize",
619 serde_json::json!({
620 "protocolVersion": 1,
621 "clientInfo": {
622 "name": "routa-desktop",
623 "version": "0.1.0"
624 }
625 }),
626 timeout_ms,
627 )
628 .await?;
629 tracing::info!(
630 "[AcpProcess:{}] Initialized: {}",
631 self.display_name,
632 serde_json::to_string(&result).unwrap_or_default()
633 );
634 Ok(result)
635 }
636
637 pub async fn new_session(&self, cwd: &str) -> Result<String, String> {
639 let result = self
640 .send_request(
641 "session/new",
642 serde_json::json!({
643 "cwd": cwd,
644 "mcpServers": []
645 }),
646 None,
647 )
648 .await?;
649
650 let session_id = result["sessionId"]
651 .as_str()
652 .ok_or_else(|| "No sessionId in session/new response".to_string())?
653 .to_string();
654
655 tracing::info!(
656 "[AcpProcess:{}] Session created: {}",
657 self.display_name,
658 session_id
659 );
660 Ok(session_id)
661 }
662
663 pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
665 self.send_request(
666 "session/prompt",
667 serde_json::json!({
668 "sessionId": session_id,
669 "prompt": [{ "type": "text", "text": text }]
670 }),
671 Some(300_000),
672 )
673 .await
674 }
675
676 pub async fn cancel(&self, session_id: &str) {
678 let msg = serde_json::json!({
679 "jsonrpc": "2.0",
680 "method": "session/cancel",
681 "params": { "sessionId": session_id }
682 });
683 let data = format!("{}\n", serde_json::to_string(&msg).unwrap());
684 let mut stdin = self.stdin.lock().await;
685 let _ = stdin.write_all(data.as_bytes()).await;
686 let _ = stdin.flush().await;
687 }
688
689 pub fn notification_sender(&self) -> &NotificationSender {
691 &self.notification_tx
692 }
693
694 pub async fn kill(&self) {
696 self.alive.store(false, Ordering::SeqCst);
697 if let Some(mut child) = self.child.lock().await.take() {
698 tracing::info!("[AcpProcess:{}] Killing process", self.display_name);
699 let _ = child.kill().await;
700 }
701 let mut map = self.pending.lock().await;
703 for (_, tx) in map.drain() {
704 let _ = tx.send(Err("Process killed".to_string()));
705 }
706 }
707}
708
709async fn handle_agent_request(
711 method: &str,
712 params: &serde_json::Value,
713 session_id: &str,
714 notification_tx: &NotificationSender,
715) -> serde_json::Value {
716 match method {
717 "session/request_permission" => {
718 serde_json::json!({
720 "outcome": { "outcome": "approved" }
721 })
722 }
723 "fs/read_text_file" => {
724 let path = params["path"].as_str().unwrap_or("");
725 match tokio::fs::read_to_string(path).await {
726 Ok(content) => serde_json::json!({ "content": content }),
727 Err(e) => serde_json::json!({
728 "error": format!("Failed to read file: {}", e)
729 }),
730 }
731 }
732 "fs/write_text_file" => {
733 let path = params["path"].as_str().unwrap_or("");
734 let content = params["content"].as_str().unwrap_or("");
735 if let Some(parent) = std::path::Path::new(path).parent() {
736 let _ = tokio::fs::create_dir_all(parent).await;
737 }
738 match tokio::fs::write(path, content).await {
739 Ok(_) => serde_json::json!({}),
740 Err(e) => serde_json::json!({
741 "error": format!("Failed to write file: {}", e)
742 }),
743 }
744 }
745 "terminal/create" => {
746 match TerminalManager::global()
747 .create(params, session_id, notification_tx)
748 .await
749 {
750 Ok(result) => result,
751 Err(error) => serde_json::json!({ "error": error }),
752 }
753 }
754 "terminal/output" => {
755 let terminal_id = params["terminalId"].as_str().unwrap_or("");
756 match TerminalManager::global().get_output(terminal_id).await {
757 Ok(result) => result,
758 Err(error) => serde_json::json!({ "error": error }),
759 }
760 }
761 "terminal/wait_for_exit" => {
762 let terminal_id = params["terminalId"].as_str().unwrap_or("");
763 match TerminalManager::global().wait_for_exit(terminal_id).await {
764 Ok(result) => result,
765 Err(error) => serde_json::json!({ "error": error }),
766 }
767 }
768 "terminal/kill" => {
769 let terminal_id = params["terminalId"].as_str().unwrap_or("");
770 match TerminalManager::global().kill(terminal_id).await {
771 Ok(_) => serde_json::json!({}),
772 Err(error) => serde_json::json!({ "error": error }),
773 }
774 }
775 "terminal/release" => {
776 let terminal_id = params["terminalId"].as_str().unwrap_or("");
777 TerminalManager::global().release(terminal_id).await;
778 serde_json::json!({})
779 }
780 _ => {
781 tracing::warn!("[AcpProcess] Unknown agent request: {}", method);
782 serde_json::json!({})
783 }
784 }
785}
786
787fn try_parse_embedded_json(line: &str) -> Option<serde_json::Value> {
789 let mut depth = 0i32;
790 let mut start = None;
791
792 for (i, ch) in line.char_indices() {
793 match ch {
794 '{' => {
795 if depth == 0 {
796 start = Some(i);
797 }
798 depth += 1;
799 }
800 '}' => {
801 depth -= 1;
802 if depth == 0 {
803 if let Some(s) = start {
804 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line[s..=i]) {
805 return Some(v);
806 }
807 }
808 start = None;
809 }
810 }
811 _ => {}
812 }
813 }
814 None
815}