1use std::path::PathBuf;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
23use tokio::process::{Child, Command};
24use tokio::sync::Mutex;
25use tokio::task::JoinHandle;
26use tracing::{debug, error, info, warn};
27
28use super::codex_protocol::*;
29use super::{send_agent_output, AgentBackend, AgentOutput, AgentSession, BackendType, SpawnConfig};
30use crate::{Error, Result};
31
32const OUTPUT_CHANNEL_SIZE: usize = 256;
34
35#[derive(Debug, Clone)]
41pub struct CodexBackend {
42 codex_path: PathBuf,
44}
45
46impl CodexBackend {
47 pub fn new() -> Result<Self> {
49 let path = which::which("codex").map_err(|_| Error::CliNotFound {
50 name: "codex".into(),
51 })?;
52 Ok(Self { codex_path: path })
53 }
54
55 pub fn with_path(path: impl Into<PathBuf>) -> Self {
57 Self {
58 codex_path: path.into(),
59 }
60 }
61
62 fn spawn_child(&self, config: &SpawnConfig) -> Result<Child> {
64 let mut cmd = Command::new(&self.codex_path);
65 cmd.arg("app-server");
68
69 if let Some(ref model) = config.model {
71 cmd.arg("-c").arg(format!("model=\"{model}\""));
72 }
73
74 if let Some(ref effort) = config.reasoning_effort {
76 cmd.arg("-c")
77 .arg(format!("model_reasoning_effort=\"{effort}\""));
78 }
79
80 cmd.stdin(std::process::Stdio::piped());
81 cmd.stdout(std::process::Stdio::piped());
82 cmd.stderr(std::process::Stdio::null());
86
87 if let Some(ref cwd) = config.cwd {
88 cmd.current_dir(cwd);
89 }
90
91 for (k, v) in &config.env {
92 cmd.env(k, v);
93 }
94
95 cmd.kill_on_drop(true);
96 let child = cmd.spawn().map_err(|e| Error::SpawnFailed {
97 name: config.name.clone(),
98 reason: format!("Failed to start codex process: {e}"),
99 })?;
100
101 Ok(child)
102 }
103}
104
105#[async_trait]
106impl AgentBackend for CodexBackend {
107 fn backend_type(&self) -> BackendType {
108 BackendType::Codex
109 }
110
111 async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>> {
112 let agent_name = config.name.clone();
113 let initial_prompt = config.prompt.clone();
114
115 info!(agent = %agent_name, "Spawning Codex agent");
116
117 let mut child = self.spawn_child(&config)?;
118
119 let stdin = child.stdin.take().ok_or_else(|| Error::SpawnFailed {
121 name: agent_name.clone(),
122 reason: "Failed to capture stdin".into(),
123 })?;
124 let stdout = child.stdout.take().ok_or_else(|| Error::SpawnFailed {
125 name: agent_name.clone(),
126 reason: "Failed to capture stdout".into(),
127 })?;
128
129 let stdin_writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
130 let mut stdout_reader = BufReader::new(stdout);
131 let request_id = Arc::new(AtomicU64::new(1));
132 let alive = Arc::new(AtomicBool::new(true));
133
134 let init_id = next_id(&request_id);
136 let init_req = JsonRpcRequest::new(
137 init_id,
138 METHOD_INITIALIZE,
139 Some(serde_json::json!({
140 "clientInfo": {
141 "name": "agent-teams",
142 "version": env!("CARGO_PKG_VERSION")
143 }
144 })),
145 );
146 send_request(&stdin_writer, &init_req).await?;
147 let init_resp = wait_for_response(&mut stdout_reader, init_id).await?;
148 let user_agent = init_resp
149 .result
150 .as_ref()
151 .and_then(|r| r.get("userAgent"))
152 .and_then(|v| v.as_str())
153 .unwrap_or("unknown");
154 debug!(agent = %agent_name, user_agent = %user_agent, "Initialize handshake complete");
155
156 let initialized_notif = JsonRpcClientNotification::new(METHOD_INITIALIZED);
158 send_notification(&stdin_writer, &initialized_notif).await?;
159 debug!(agent = %agent_name, "Sent 'initialized' notification");
160
161 let thread_id_num = next_id(&request_id);
163 let cwd = config
164 .cwd
165 .as_ref()
166 .map(|p| p.display().to_string())
167 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default().display().to_string());
168
169 let thread_req = JsonRpcRequest::new(
170 thread_id_num,
171 METHOD_THREAD_START,
172 Some(serde_json::json!({
173 "cwd": cwd,
174 "approvalPolicy": "never"
175 })),
176 );
177 send_request(&stdin_writer, &thread_req).await?;
178 let thread_resp = wait_for_response(&mut stdout_reader, thread_id_num).await?;
179
180 let thread_id = thread_resp
181 .result
182 .as_ref()
183 .and_then(|r| r.get("thread"))
184 .and_then(|t| t.get("id"))
185 .and_then(|v| v.as_str())
186 .map(|s| s.to_string())
187 .ok_or_else(|| Error::SpawnFailed {
188 name: agent_name.clone(),
189 reason: "thread/start response missing thread.id".into(),
190 })?;
191
192 debug!(
193 agent = %agent_name,
194 thread_id = %thread_id,
195 "Thread created"
196 );
197
198 let turn_id = next_id(&request_id);
200 let turn_req = JsonRpcRequest::new(
201 turn_id,
202 METHOD_TURN_START,
203 Some(serde_json::json!({
204 "threadId": thread_id,
205 "input": [
206 {
207 "type": "text",
208 "text": initial_prompt
209 }
210 ]
211 })),
212 );
213 send_request(&stdin_writer, &turn_req).await?;
214
215 let (output_tx, output_rx) = tokio::sync::mpsc::channel(OUTPUT_CHANNEL_SIZE);
217 let reader_alive = alive.clone();
218 let reader_name = agent_name.clone();
219
220 let reader_handle = tokio::spawn(async move {
221 debug!(agent = %reader_name, "Background Codex reader started");
222 let mut line_buf = String::new();
223
224 loop {
225 if !reader_alive.load(Ordering::Relaxed) {
226 break;
227 }
228
229 line_buf.clear();
230 match stdout_reader.read_line(&mut line_buf).await {
231 Ok(0) => {
232 debug!(agent = %reader_name, "Codex stdout EOF");
234 reader_alive.store(false, Ordering::Relaxed);
235 let _ = output_tx.send(AgentOutput::Idle).await;
236 break;
237 }
238 Ok(_) => {
239 let trimmed = line_buf.trim();
240 if trimmed.is_empty() {
241 continue;
242 }
243
244 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
246 Ok(JsonRpcMessage::Notification(notif)) => {
247 if let Some(output) = map_notification_to_output(¬if)
248 && send_agent_output(
249 &output_tx, output, &reader_alive, &reader_name,
250 ).await.is_err()
251 {
252 break;
253 }
254 }
255 Ok(JsonRpcMessage::Response(resp)) => {
256 if let Some(err) = resp.error {
258 if output_tx.send(AgentOutput::Error(err.to_string())).await.is_err() {
260 reader_alive.store(false, Ordering::Relaxed);
261 break;
262 }
263 }
264 }
265 Err(e) => {
266 warn!(
267 agent = %reader_name,
268 line = %trimmed,
269 error = %e,
270 "Failed to parse Codex output line"
271 );
272 }
273 }
274 }
275 Err(e) => {
276 error!(agent = %reader_name, error = %e, "Error reading Codex stdout");
277 let _ = output_tx.send(AgentOutput::Error(format!("Read error: {e}"))).await;
279 reader_alive.store(false, Ordering::Relaxed);
280 break;
281 }
282 }
283 }
284 debug!(agent = %reader_name, "Background Codex reader stopped");
285 });
286
287 let session = CodexSession {
288 name: agent_name,
289 child: Some(child),
290 stdin: stdin_writer,
291 thread_id,
292 request_id,
293 output_rx: Some(output_rx),
294 alive,
295 reader_handle: Some(reader_handle),
296 };
297
298 Ok(Box::new(session))
299 }
300}
301
302struct CodexSession {
308 name: String,
309 child: Option<Child>,
310 stdin: Arc<Mutex<BufWriter<tokio::process::ChildStdin>>>,
311 thread_id: String,
312 request_id: Arc<AtomicU64>,
313 output_rx: Option<tokio::sync::mpsc::Receiver<AgentOutput>>,
314 alive: Arc<AtomicBool>,
315 reader_handle: Option<JoinHandle<()>>,
316}
317
318#[async_trait]
319impl AgentSession for CodexSession {
320 fn name(&self) -> &str {
321 &self.name
322 }
323
324 async fn send_input(&mut self, input: &str) -> Result<()> {
325 if !self.alive.load(Ordering::Relaxed) {
326 return Err(Error::AgentNotAlive {
327 name: self.name.clone(),
328 });
329 }
330
331 let id = next_id(&self.request_id);
332 let req = JsonRpcRequest::new(
333 id,
334 METHOD_TURN_START,
335 Some(serde_json::json!({
336 "threadId": self.thread_id,
337 "input": [
338 {
339 "type": "text",
340 "text": input
341 }
342 ]
343 })),
344 );
345 send_request(&self.stdin, &req).await
346 }
347
348 fn output_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<AgentOutput>> {
349 self.output_rx.take()
350 }
351
352 async fn is_alive(&self) -> bool {
353 self.alive.load(Ordering::Relaxed)
354 }
355
356 async fn shutdown(&mut self) -> Result<()> {
357 info!(agent = %self.name, "Shutting down Codex session");
358 self.alive.store(false, Ordering::Relaxed);
359
360 if let Some(handle) = self.reader_handle.take() {
362 handle.abort();
363 let _ = handle.await;
364 }
365
366 {
368 let mut writer = self.stdin.lock().await;
369 let _ = writer.shutdown().await;
370 }
371
372 if let Some(ref mut child) = self.child {
374 let timeout =
375 tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
376
377 if timeout.is_err() {
378 warn!(agent = %self.name, "Codex child did not exit in time, killing");
379 let _ = child.kill().await;
380 }
381 }
382
383 Ok(())
384 }
385
386 async fn force_kill(&mut self) -> Result<()> {
387 info!(agent = %self.name, "Force-killing Codex session");
388 self.alive.store(false, Ordering::Relaxed);
389
390 if let Some(handle) = self.reader_handle.take() {
392 handle.abort();
393 let _ = handle.await;
394 }
395
396 if let Some(ref mut child) = self.child {
397 child.kill().await.map_err(|e| {
398 Error::CodexProtocol {
399 reason: format!(
400 "Failed to kill Codex process for {}: {e}",
401 self.name
402 ),
403 }
404 })?;
405 }
406
407 Ok(())
408 }
409}
410
411impl Drop for CodexSession {
412 fn drop(&mut self) {
413 if let Some(handle) = self.reader_handle.take() {
416 handle.abort();
417 }
418 }
419}
420
421fn next_id(counter: &AtomicU64) -> u64 {
427 counter.fetch_add(1, Ordering::Relaxed)
428}
429
430async fn send_request(
432 writer: &Arc<Mutex<BufWriter<tokio::process::ChildStdin>>>,
433 request: &JsonRpcRequest,
434) -> Result<()> {
435 let line = serde_json::to_string(request)?;
436 let mut w = writer.lock().await;
437 w.write_all(line.as_bytes())
438 .await
439 .map_err(|e| Error::CodexProtocol { reason: format!("Failed to write to Codex stdin: {e}") })?;
440 w.write_all(b"\n")
441 .await
442 .map_err(|e| Error::CodexProtocol { reason: format!("Failed to write newline to Codex stdin: {e}") })?;
443 w.flush()
444 .await
445 .map_err(|e| Error::CodexProtocol { reason: format!("Failed to flush Codex stdin: {e}") })?;
446 Ok(())
447}
448
449async fn send_notification(
451 writer: &Arc<Mutex<BufWriter<tokio::process::ChildStdin>>>,
452 notification: &JsonRpcClientNotification,
453) -> Result<()> {
454 let line = serde_json::to_string(notification)?;
455 let mut w = writer.lock().await;
456 w.write_all(line.as_bytes())
457 .await
458 .map_err(|e| Error::CodexProtocol { reason: format!("Failed to write notification to Codex stdin: {e}") })?;
459 w.write_all(b"\n")
460 .await
461 .map_err(|e| Error::CodexProtocol { reason: format!("Failed to write newline to Codex stdin: {e}") })?;
462 w.flush()
463 .await
464 .map_err(|e| Error::CodexProtocol { reason: format!("Failed to flush Codex stdin: {e}") })?;
465 Ok(())
466}
467
468async fn wait_for_response(
472 reader: &mut BufReader<tokio::process::ChildStdout>,
473 expected_id: u64,
474) -> Result<JsonRpcResponse> {
475 let expected_val = serde_json::Value::Number(expected_id.into());
476 let mut line_buf = String::new();
477
478 let timeout_duration = std::time::Duration::from_secs(30);
479 let deadline = tokio::time::Instant::now() + timeout_duration;
480
481 loop {
482 line_buf.clear();
483
484 let read_result = tokio::time::timeout_at(deadline, reader.read_line(&mut line_buf))
485 .await
486 .map_err(|_| Error::Timeout { seconds: 30 })?
487 .map_err(|e| Error::CodexProtocol { reason: format!("Read error waiting for response: {e}") })?;
488
489 if read_result == 0 {
490 return Err(Error::CodexProtocol {
491 reason: "Codex process closed stdout before responding".into(),
492 });
493 }
494
495 let trimmed = line_buf.trim();
496 if trimmed.is_empty() {
497 continue;
498 }
499
500 if let Ok(resp) = serde_json::from_str::<JsonRpcResponse>(trimmed)
502 && resp.id == expected_val
503 {
504 if let Some(ref err) = resp.error {
505 return Err(Error::CodexProtocol { reason: format!("Codex RPC error: {err}") });
506 }
507 return Ok(resp);
508 }
509 }
511}
512
513fn map_notification_to_output(notif: &JsonRpcNotification) -> Option<AgentOutput> {
514 match notif.method.as_str() {
515 EVENT_AGENT_MESSAGE_DELTA => {
516 let text = notif
518 .params
519 .as_ref()
520 .and_then(|p| p.get("delta"))
521 .and_then(|v| v.as_str())
522 .unwrap_or_default();
523
524 if text.is_empty() {
525 None
526 } else {
527 Some(AgentOutput::Delta(text.to_string()))
528 }
529 }
530 EVENT_COMMAND_OUTPUT_DELTA => {
531 let text = notif
533 .params
534 .as_ref()
535 .and_then(|p| p.get("delta"))
536 .and_then(|v| v.as_str())
537 .unwrap_or_default();
538
539 if text.is_empty() {
540 None
541 } else {
542 Some(AgentOutput::Delta(text.to_string()))
543 }
544 }
545 EVENT_ITEM_COMPLETED => {
546 let item = notif
549 .params
550 .as_ref()
551 .and_then(|p| p.get("item"));
552
553 let is_agent_message = item
554 .and_then(|i| i.get("type"))
555 .and_then(|t| t.as_str())
556 == Some("agentMessage");
557
558 if !is_agent_message {
559 return None;
560 }
561
562 let text: String = item
564 .and_then(|i| i.get("content"))
565 .and_then(|c| c.as_array())
566 .map(|arr| {
567 arr.iter()
568 .filter_map(|part| {
569 if part.get("type").and_then(|t| t.as_str()) == Some("text") {
570 part.get("text").and_then(|t| t.as_str())
571 } else {
572 None
573 }
574 })
575 .collect::<Vec<_>>()
576 .join("")
577 })
578 .unwrap_or_default();
579
580 if text.is_empty() {
581 None
582 } else {
583 Some(AgentOutput::Message(text))
584 }
585 }
586 EVENT_TURN_COMPLETED => Some(AgentOutput::TurnComplete),
587 EVENT_ERROR => {
588 let message = notif
589 .params
590 .as_ref()
591 .and_then(|p| p.get("message"))
592 .and_then(|v| v.as_str())
593 .unwrap_or("Unknown error");
594 Some(AgentOutput::Error(message.to_string()))
595 }
596 EVENT_THREAD_STARTED | EVENT_TURN_STARTED | EVENT_ITEM_STARTED => None,
598 other => {
600 if !other.starts_with("codex/event/") {
601 debug!(method = %notif.method, "Unhandled Codex notification");
602 }
603 None
604 }
605 }
606}