Skip to main content

agent_teams/backend/
codex.rs

1//! Codex backend -- spawns agents via the `codex app-server` stdio protocol.
2//!
3//! The Codex app-server uses a JSON-RPC-like protocol (without the `jsonrpc` field)
4//! over stdin/stdout with newline-delimited JSON messages.
5//!
6//! The lifecycle of a Codex session:
7//!
8//! 1. Spawn `codex app-server` as a child process.
9//! 2. Send `initialize` with `{clientInfo: {name, version}}` → receive `{userAgent}`.
10//! 3. Send `initialized` notification (no `id`).
11//! 4. Send `thread/start` with `{cwd, approvalPolicy}` → receive `{thread: {id, ...}}`.
12//! 5. Send `turn/start` with `{threadId, input: [{type: "text", text: "..."}]}`.
13//! 6. A background task reads stdout line-by-line, parsing messages
14//!    and forwarding them as [`AgentOutput`] events through an mpsc channel.
15//! 7. Follow-up inputs are sent as additional `turn/start` requests.
16
17use std::path::PathBuf;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
23use tokio::process::{Child, Command};
24use tokio::sync::Mutex;
25use tokio::task::JoinHandle;
26use tracing::{debug, error, info, warn};
27
28use super::codex_protocol::*;
29use super::{send_agent_output, AgentBackend, AgentOutput, AgentSession, BackendType, SpawnConfig};
30use crate::{Error, Result};
31
32/// Channel buffer size for agent output events.
33const OUTPUT_CHANNEL_SIZE: usize = 256;
34
35// ---------------------------------------------------------------------------
36// CodexBackend  (factory)
37// ---------------------------------------------------------------------------
38
39/// Factory that creates Codex agent sessions by spawning a Codex subprocess.
40#[derive(Debug, Clone)]
41pub struct CodexBackend {
42    /// Path to the `codex` CLI binary.
43    codex_path: PathBuf,
44}
45
46impl CodexBackend {
47    /// Locate the `codex` binary on `$PATH` via `which`.
48    pub fn new() -> Result<Self> {
49        let path = which::which("codex").map_err(|_| Error::CliNotFound {
50            name: "codex".into(),
51        })?;
52        Ok(Self { codex_path: path })
53    }
54
55    /// Use an explicit path to the `codex` binary.
56    pub fn with_path(path: impl Into<PathBuf>) -> Self {
57        Self {
58            codex_path: path.into(),
59        }
60    }
61
62    /// Spawn the Codex child process.
63    fn spawn_child(&self, config: &SpawnConfig) -> Result<Child> {
64        let mut cmd = Command::new(&self.codex_path);
65        // `codex app-server` reads JSON from stdin, writes JSON to stdout.
66        // No additional flags needed -- stdio is the default transport.
67        cmd.arg("app-server");
68
69        // Pass model override via -c config flag
70        if let Some(ref model) = config.model {
71            cmd.arg("-c").arg(format!("model=\"{model}\""));
72        }
73
74        // Pass reasoning effort override via -c config flag
75        if let Some(ref effort) = config.reasoning_effort {
76            cmd.arg("-c")
77                .arg(format!("model_reasoning_effort=\"{effort}\""));
78        }
79
80        cmd.stdin(std::process::Stdio::piped());
81        cmd.stdout(std::process::Stdio::piped());
82        // Discard stderr to avoid pipe-buffer deadlock: if the child writes
83        // enough to stderr without anyone reading it, the OS buffer fills and
84        // the child blocks, stalling stdout as well.
85        cmd.stderr(std::process::Stdio::null());
86
87        if let Some(ref cwd) = config.cwd {
88            cmd.current_dir(cwd);
89        }
90
91        for (k, v) in &config.env {
92            cmd.env(k, v);
93        }
94
95        cmd.kill_on_drop(true);
96        let child = cmd.spawn().map_err(|e| Error::SpawnFailed {
97            name: config.name.clone(),
98            reason: format!("Failed to start codex process: {e}"),
99        })?;
100
101        Ok(child)
102    }
103}
104
105#[async_trait]
106impl AgentBackend for CodexBackend {
107    fn backend_type(&self) -> BackendType {
108        BackendType::Codex
109    }
110
111    async fn spawn(&self, config: SpawnConfig) -> Result<Box<dyn AgentSession>> {
112        let agent_name = config.name.clone();
113        let initial_prompt = config.prompt.clone();
114
115        info!(agent = %agent_name, "Spawning Codex agent");
116
117        let mut child = self.spawn_child(&config)?;
118
119        // Take ownership of stdin/stdout
120        let stdin = child.stdin.take().ok_or_else(|| Error::SpawnFailed {
121            name: agent_name.clone(),
122            reason: "Failed to capture stdin".into(),
123        })?;
124        let stdout = child.stdout.take().ok_or_else(|| Error::SpawnFailed {
125            name: agent_name.clone(),
126            reason: "Failed to capture stdout".into(),
127        })?;
128
129        let stdin_writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
130        let mut stdout_reader = BufReader::new(stdout);
131        let request_id = Arc::new(AtomicU64::new(1));
132        let alive = Arc::new(AtomicBool::new(true));
133
134        // ----- Step 1: Initialize handshake -----
135        let init_id = next_id(&request_id);
136        let init_req = JsonRpcRequest::new(
137            init_id,
138            METHOD_INITIALIZE,
139            Some(serde_json::json!({
140                "clientInfo": {
141                    "name": "agent-teams",
142                    "version": env!("CARGO_PKG_VERSION")
143                }
144            })),
145        );
146        send_request(&stdin_writer, &init_req).await?;
147        let init_resp = wait_for_response(&mut stdout_reader, init_id).await?;
148        let user_agent = init_resp
149            .result
150            .as_ref()
151            .and_then(|r| r.get("userAgent"))
152            .and_then(|v| v.as_str())
153            .unwrap_or("unknown");
154        debug!(agent = %agent_name, user_agent = %user_agent, "Initialize handshake complete");
155
156        // ----- Step 2: Send `initialized` notification -----
157        let initialized_notif = JsonRpcClientNotification::new(METHOD_INITIALIZED);
158        send_notification(&stdin_writer, &initialized_notif).await?;
159        debug!(agent = %agent_name, "Sent 'initialized' notification");
160
161        // ----- Step 3: Start a thread -----
162        let thread_id_num = next_id(&request_id);
163        let cwd = config
164            .cwd
165            .as_ref()
166            .map(|p| p.display().to_string())
167            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default().display().to_string());
168
169        let thread_req = JsonRpcRequest::new(
170            thread_id_num,
171            METHOD_THREAD_START,
172            Some(serde_json::json!({
173                "cwd": cwd,
174                "approvalPolicy": "never"
175            })),
176        );
177        send_request(&stdin_writer, &thread_req).await?;
178        let thread_resp = wait_for_response(&mut stdout_reader, thread_id_num).await?;
179
180        let thread_id = thread_resp
181            .result
182            .as_ref()
183            .and_then(|r| r.get("thread"))
184            .and_then(|t| t.get("id"))
185            .and_then(|v| v.as_str())
186            .map(|s| s.to_string())
187            .ok_or_else(|| Error::SpawnFailed {
188                name: agent_name.clone(),
189                reason: "thread/start response missing thread.id".into(),
190            })?;
191
192        debug!(
193            agent = %agent_name,
194            thread_id = %thread_id,
195            "Thread created"
196        );
197
198        // ----- Step 4: Send initial prompt as first turn -----
199        let turn_id = next_id(&request_id);
200        let turn_req = JsonRpcRequest::new(
201            turn_id,
202            METHOD_TURN_START,
203            Some(serde_json::json!({
204                "threadId": thread_id,
205                "input": [
206                    {
207                        "type": "text",
208                        "text": initial_prompt
209                    }
210                ]
211            })),
212        );
213        send_request(&stdin_writer, &turn_req).await?;
214
215        // ----- Step 5: Spawn background reader -----
216        let (output_tx, output_rx) = tokio::sync::mpsc::channel(OUTPUT_CHANNEL_SIZE);
217        let reader_alive = alive.clone();
218        let reader_name = agent_name.clone();
219
220        let reader_handle = tokio::spawn(async move {
221            debug!(agent = %reader_name, "Background Codex reader started");
222            let mut line_buf = String::new();
223
224            loop {
225                if !reader_alive.load(Ordering::Relaxed) {
226                    break;
227                }
228
229                line_buf.clear();
230                match stdout_reader.read_line(&mut line_buf).await {
231                    Ok(0) => {
232                        // EOF -- process exited. Idle is a control event: must guarantee delivery.
233                        debug!(agent = %reader_name, "Codex stdout EOF");
234                        reader_alive.store(false, Ordering::Relaxed);
235                        let _ = output_tx.send(AgentOutput::Idle).await;
236                        break;
237                    }
238                    Ok(_) => {
239                        let trimmed = line_buf.trim();
240                        if trimmed.is_empty() {
241                            continue;
242                        }
243
244                        // Try to parse as a JSON-RPC message
245                        match serde_json::from_str::<JsonRpcMessage>(trimmed) {
246                            Ok(JsonRpcMessage::Notification(notif)) => {
247                                if let Some(output) = map_notification_to_output(&notif)
248                                    && send_agent_output(
249                                        &output_tx, output, &reader_alive, &reader_name,
250                                    ).await.is_err()
251                                {
252                                    break;
253                                }
254                            }
255                            Ok(JsonRpcMessage::Response(resp)) => {
256                                // Responses to our requests; check for errors
257                                if let Some(err) = resp.error {
258                                    // Error is a control event: guaranteed delivery
259                                    if output_tx.send(AgentOutput::Error(err.to_string())).await.is_err() {
260                                        reader_alive.store(false, Ordering::Relaxed);
261                                        break;
262                                    }
263                                }
264                            }
265                            Err(e) => {
266                                warn!(
267                                    agent = %reader_name,
268                                    line = %trimmed,
269                                    error = %e,
270                                    "Failed to parse Codex output line"
271                                );
272                            }
273                        }
274                    }
275                    Err(e) => {
276                        error!(agent = %reader_name, error = %e, "Error reading Codex stdout");
277                        // Error is a control event: guaranteed delivery
278                        let _ = output_tx.send(AgentOutput::Error(format!("Read error: {e}"))).await;
279                        reader_alive.store(false, Ordering::Relaxed);
280                        break;
281                    }
282                }
283            }
284            debug!(agent = %reader_name, "Background Codex reader stopped");
285        });
286
287        let session = CodexSession {
288            name: agent_name,
289            child: Some(child),
290            stdin: stdin_writer,
291            thread_id,
292            request_id,
293            output_rx: Some(output_rx),
294            alive,
295            reader_handle: Some(reader_handle),
296        };
297
298        Ok(Box::new(session))
299    }
300}
301
302// ---------------------------------------------------------------------------
303// CodexSession
304// ---------------------------------------------------------------------------
305
306/// A running Codex agent session.
307struct CodexSession {
308    name: String,
309    child: Option<Child>,
310    stdin: Arc<Mutex<BufWriter<tokio::process::ChildStdin>>>,
311    thread_id: String,
312    request_id: Arc<AtomicU64>,
313    output_rx: Option<tokio::sync::mpsc::Receiver<AgentOutput>>,
314    alive: Arc<AtomicBool>,
315    reader_handle: Option<JoinHandle<()>>,
316}
317
318#[async_trait]
319impl AgentSession for CodexSession {
320    fn name(&self) -> &str {
321        &self.name
322    }
323
324    async fn send_input(&mut self, input: &str) -> Result<()> {
325        if !self.alive.load(Ordering::Relaxed) {
326            return Err(Error::AgentNotAlive {
327                name: self.name.clone(),
328            });
329        }
330
331        let id = next_id(&self.request_id);
332        let req = JsonRpcRequest::new(
333            id,
334            METHOD_TURN_START,
335            Some(serde_json::json!({
336                "threadId": self.thread_id,
337                "input": [
338                    {
339                        "type": "text",
340                        "text": input
341                    }
342                ]
343            })),
344        );
345        send_request(&self.stdin, &req).await
346    }
347
348    fn output_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<AgentOutput>> {
349        self.output_rx.take()
350    }
351
352    async fn is_alive(&self) -> bool {
353        self.alive.load(Ordering::Relaxed)
354    }
355
356    async fn shutdown(&mut self) -> Result<()> {
357        info!(agent = %self.name, "Shutting down Codex session");
358        self.alive.store(false, Ordering::Relaxed);
359
360        // Abort the reader task so it doesn't block on stdout reads
361        if let Some(handle) = self.reader_handle.take() {
362            handle.abort();
363            let _ = handle.await;
364        }
365
366        // Close stdin to signal the child to exit
367        {
368            let mut writer = self.stdin.lock().await;
369            let _ = writer.shutdown().await;
370        }
371
372        // Wait briefly for the child to exit, then kill if needed
373        if let Some(ref mut child) = self.child {
374            let timeout =
375                tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
376
377            if timeout.is_err() {
378                warn!(agent = %self.name, "Codex child did not exit in time, killing");
379                let _ = child.kill().await;
380            }
381        }
382
383        Ok(())
384    }
385
386    async fn force_kill(&mut self) -> Result<()> {
387        info!(agent = %self.name, "Force-killing Codex session");
388        self.alive.store(false, Ordering::Relaxed);
389
390        // Abort the reader task first
391        if let Some(handle) = self.reader_handle.take() {
392            handle.abort();
393            let _ = handle.await;
394        }
395
396        if let Some(ref mut child) = self.child {
397            child.kill().await.map_err(|e| {
398                Error::CodexProtocol {
399                    reason: format!(
400                        "Failed to kill Codex process for {}: {e}",
401                        self.name
402                    ),
403                }
404            })?;
405        }
406
407        Ok(())
408    }
409}
410
411impl Drop for CodexSession {
412    fn drop(&mut self) {
413        // Abort the reader task if it was not already taken by shutdown/force_kill.
414        // The child process is handled by kill_on_drop(true).
415        if let Some(handle) = self.reader_handle.take() {
416            handle.abort();
417        }
418    }
419}
420
421// ---------------------------------------------------------------------------
422// Helpers
423// ---------------------------------------------------------------------------
424
425/// Atomically increment and return the next request ID.
426fn next_id(counter: &AtomicU64) -> u64 {
427    counter.fetch_add(1, Ordering::Relaxed)
428}
429
430/// Serialize a JSON-RPC request and write it to the writer followed by a newline.
431async fn send_request(
432    writer: &Arc<Mutex<BufWriter<tokio::process::ChildStdin>>>,
433    request: &JsonRpcRequest,
434) -> Result<()> {
435    let line = serde_json::to_string(request)?;
436    let mut w = writer.lock().await;
437    w.write_all(line.as_bytes())
438        .await
439        .map_err(|e| Error::CodexProtocol { reason: format!("Failed to write to Codex stdin: {e}") })?;
440    w.write_all(b"\n")
441        .await
442        .map_err(|e| Error::CodexProtocol { reason: format!("Failed to write newline to Codex stdin: {e}") })?;
443    w.flush()
444        .await
445        .map_err(|e| Error::CodexProtocol { reason: format!("Failed to flush Codex stdin: {e}") })?;
446    Ok(())
447}
448
449/// Serialize a client notification and write it to the writer followed by a newline.
450async fn send_notification(
451    writer: &Arc<Mutex<BufWriter<tokio::process::ChildStdin>>>,
452    notification: &JsonRpcClientNotification,
453) -> Result<()> {
454    let line = serde_json::to_string(notification)?;
455    let mut w = writer.lock().await;
456    w.write_all(line.as_bytes())
457        .await
458        .map_err(|e| Error::CodexProtocol { reason: format!("Failed to write notification to Codex stdin: {e}") })?;
459    w.write_all(b"\n")
460        .await
461        .map_err(|e| Error::CodexProtocol { reason: format!("Failed to write newline to Codex stdin: {e}") })?;
462    w.flush()
463        .await
464        .map_err(|e| Error::CodexProtocol { reason: format!("Failed to flush Codex stdin: {e}") })?;
465    Ok(())
466}
467
468/// Read lines from the reader until we find a response matching the given `id`.
469/// Returns the response. Non-matching lines (notifications, other responses) are
470/// consumed and discarded during this blocking wait.
471async fn wait_for_response(
472    reader: &mut BufReader<tokio::process::ChildStdout>,
473    expected_id: u64,
474) -> Result<JsonRpcResponse> {
475    let expected_val = serde_json::Value::Number(expected_id.into());
476    let mut line_buf = String::new();
477
478    let timeout_duration = std::time::Duration::from_secs(30);
479    let deadline = tokio::time::Instant::now() + timeout_duration;
480
481    loop {
482        line_buf.clear();
483
484        let read_result = tokio::time::timeout_at(deadline, reader.read_line(&mut line_buf))
485            .await
486            .map_err(|_| Error::Timeout { seconds: 30 })?
487            .map_err(|e| Error::CodexProtocol { reason: format!("Read error waiting for response: {e}") })?;
488
489        if read_result == 0 {
490            return Err(Error::CodexProtocol {
491                reason: "Codex process closed stdout before responding".into(),
492            });
493        }
494
495        let trimmed = line_buf.trim();
496        if trimmed.is_empty() {
497            continue;
498        }
499
500        // Try to parse as a response with matching id
501        if let Ok(resp) = serde_json::from_str::<JsonRpcResponse>(trimmed)
502            && resp.id == expected_val
503        {
504            if let Some(ref err) = resp.error {
505                return Err(Error::CodexProtocol { reason: format!("Codex RPC error: {err}") });
506            }
507            return Ok(resp);
508        }
509        // Notifications and non-matching responses are silently skipped during handshake.
510    }
511}
512
513fn map_notification_to_output(notif: &JsonRpcNotification) -> Option<AgentOutput> {
514    match notif.method.as_str() {
515        EVENT_AGENT_MESSAGE_DELTA => {
516            // Extract streaming text delta from `params.delta`
517            let text = notif
518                .params
519                .as_ref()
520                .and_then(|p| p.get("delta"))
521                .and_then(|v| v.as_str())
522                .unwrap_or_default();
523
524            if text.is_empty() {
525                None
526            } else {
527                Some(AgentOutput::Delta(text.to_string()))
528            }
529        }
530        EVENT_COMMAND_OUTPUT_DELTA => {
531            // Command execution output delta from `params.delta`
532            let text = notif
533                .params
534                .as_ref()
535                .and_then(|p| p.get("delta"))
536                .and_then(|v| v.as_str())
537                .unwrap_or_default();
538
539            if text.is_empty() {
540                None
541            } else {
542                Some(AgentOutput::Delta(text.to_string()))
543            }
544        }
545        EVENT_ITEM_COMPLETED => {
546            // Extract text content from a completed agentMessage item.
547            // The item structure is: {type: "agentMessage", content: [{type: "text", text: "..."}]}
548            let item = notif
549                .params
550                .as_ref()
551                .and_then(|p| p.get("item"));
552
553            let is_agent_message = item
554                .and_then(|i| i.get("type"))
555                .and_then(|t| t.as_str())
556                == Some("agentMessage");
557
558            if !is_agent_message {
559                return None;
560            }
561
562            // Collect ALL text blocks from content array (not just the first)
563            let text: String = item
564                .and_then(|i| i.get("content"))
565                .and_then(|c| c.as_array())
566                .map(|arr| {
567                    arr.iter()
568                        .filter_map(|part| {
569                            if part.get("type").and_then(|t| t.as_str()) == Some("text") {
570                                part.get("text").and_then(|t| t.as_str())
571                            } else {
572                                None
573                            }
574                        })
575                        .collect::<Vec<_>>()
576                        .join("")
577                })
578                .unwrap_or_default();
579
580            if text.is_empty() {
581                None
582            } else {
583                Some(AgentOutput::Message(text))
584            }
585        }
586        EVENT_TURN_COMPLETED => Some(AgentOutput::TurnComplete),
587        EVENT_ERROR => {
588            let message = notif
589                .params
590                .as_ref()
591                .and_then(|p| p.get("message"))
592                .and_then(|v| v.as_str())
593                .unwrap_or("Unknown error");
594            Some(AgentOutput::Error(message.to_string()))
595        }
596        // Informational events -- no output needed
597        EVENT_THREAD_STARTED | EVENT_TURN_STARTED | EVENT_ITEM_STARTED => None,
598        // Ignore internal/legacy codex events and other unknowns
599        other => {
600            if !other.starts_with("codex/event/") {
601                debug!(method = %notif.method, "Unhandled Codex notification");
602            }
603            None
604        }
605    }
606}