use std::io;
use std::pin::Pin;
use std::process::ExitStatus;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::process::Child;
use tokio::sync::Mutex;
use agent_team_mail_core::event_log::{EventFields, emit_event_best_effort};
use crate::config::AgentMcpConfig;
pub struct RawChildIo {
pub stdin: Arc<Mutex<Box<dyn AsyncWrite + Send + Unpin>>>,
pub stdout: Box<dyn AsyncRead + Send + Unpin>,
pub exit_status: Arc<Mutex<Option<ExitStatus>>>,
pub process: Arc<Mutex<Option<Child>>>,
pub idle_flag: Option<Arc<AtomicBool>>,
}
#[async_trait]
pub(crate) trait CodexTransport: Send + Sync + std::fmt::Debug {
async fn spawn(&self) -> anyhow::Result<RawChildIo>;
fn is_idle(&self) -> bool {
false
}
}
#[derive(Debug)]
pub(crate) struct McpTransport {
config: AgentMcpConfig,
team: String,
}
impl McpTransport {
pub fn new(config: AgentMcpConfig, team: impl Into<String>) -> Self {
let team = team.into();
emit_event_best_effort(EventFields {
level: "info",
source: "atm-agent-mcp",
action: "transport_init",
team: Some(team.clone()),
result: Some("mcp".to_string()),
..Default::default()
});
Self { config, team }
}
}
impl Drop for McpTransport {
fn drop(&mut self) {
emit_event_best_effort(EventFields {
level: "info",
source: "atm-agent-mcp",
action: "transport_shutdown",
team: Some(self.team.clone()),
result: Some("mcp".to_string()),
..Default::default()
});
}
}
#[async_trait]
impl CodexTransport for McpTransport {
async fn spawn(&self) -> anyhow::Result<RawChildIo> {
use tokio::process::Command;
let mut cmd = Command::new(&self.config.codex_bin);
cmd.arg("mcp-server");
if let Some(ref model) = self.config.model {
cmd.arg("-m").arg(model);
}
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null());
let mut child = cmd.spawn()?;
let stdin = child.stdin.take().expect("child stdin must be piped");
let stdout = child.stdout.take().expect("child stdout must be piped");
let exit_status: Arc<Mutex<Option<std::process::ExitStatus>>> =
Arc::new(Mutex::new(None));
let shared_stdin = Arc::new(Mutex::new(
Box::new(stdin) as Box<dyn AsyncWrite + Send + Unpin>
));
let process: Arc<Mutex<Option<tokio::process::Child>>> =
Arc::new(Mutex::new(Some(child)));
Ok(RawChildIo {
stdin: shared_stdin,
stdout: Box::new(stdout) as Box<dyn AsyncRead + Send + Unpin>,
exit_status,
process,
idle_flag: None,
})
}
}
#[derive(Debug)]
pub(crate) struct JsonCodecTransport {
config: AgentMcpConfig,
team: String,
idle_flag: Arc<AtomicBool>,
}
impl JsonCodecTransport {
pub fn new(config: AgentMcpConfig, team: impl Into<String>) -> Self {
let team = team.into();
emit_event_best_effort(EventFields {
level: "info",
source: "atm-agent-mcp",
action: "transport_init",
team: Some(team.clone()),
result: Some("json".to_string()),
..Default::default()
});
Self {
config,
team,
idle_flag: Arc::new(AtomicBool::new(false)),
}
}
}
impl Drop for JsonCodecTransport {
fn drop(&mut self) {
emit_event_best_effort(EventFields {
level: "info",
source: "atm-agent-mcp",
action: "transport_shutdown",
team: Some(self.team.clone()),
result: Some("json".to_string()),
..Default::default()
});
}
}
#[derive(Debug)]
enum TransportEventType {
AgentMessage,
ToolCall,
ToolResult,
FileChange,
Idle,
Done,
Unknown,
}
fn parse_event_type(line: &str) -> TransportEventType {
serde_json::from_str::<serde_json::Value>(line)
.ok()
.and_then(|v| v.get("type").and_then(|t| t.as_str()).map(|s| s.to_string()))
.map(|t| match t.as_str() {
"agent_message" => TransportEventType::AgentMessage,
"tool_call" => TransportEventType::ToolCall,
"tool_result" => TransportEventType::ToolResult,
"file_change" => TransportEventType::FileChange,
"idle" => TransportEventType::Idle,
"done" => TransportEventType::Done,
_ => TransportEventType::Unknown,
})
.unwrap_or(TransportEventType::Unknown)
}
#[async_trait]
impl CodexTransport for JsonCodecTransport {
async fn spawn(&self) -> anyhow::Result<RawChildIo> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
let mut cmd = Command::new(&self.config.codex_bin);
cmd.arg("exec").arg("--json");
if let Some(ref model) = self.config.model {
cmd.arg("-m").arg(model);
}
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null());
let mut child = cmd.spawn()?;
let stdin = child.stdin.take().expect("child stdin must be piped");
let child_stdout = child.stdout.take().expect("child stdout must be piped");
let (mut duplex_write, duplex_read) = tokio::io::duplex(65_536);
let idle_flag = Arc::clone(&self.idle_flag);
let team_for_task = self.team.clone();
tokio::spawn(async move {
let reader = BufReader::new(child_stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
match parse_event_type(&line) {
TransportEventType::Idle => {
idle_flag.store(true, Ordering::SeqCst);
emit_event_best_effort(EventFields {
level: "info",
source: "atm-agent-mcp",
action: "idle_detected",
team: Some(team_for_task.clone()),
result: Some("json".to_string()),
..Default::default()
});
}
TransportEventType::Done => {
emit_event_best_effort(EventFields {
level: "info",
source: "atm-agent-mcp",
action: "codex_done",
team: Some(team_for_task.clone()),
result: Some("json".to_string()),
..Default::default()
});
idle_flag.store(false, Ordering::SeqCst);
}
_ => {
idle_flag.store(false, Ordering::SeqCst);
}
}
let bytes = format!("{line}\n");
if duplex_write.write_all(bytes.as_bytes()).await.is_err() {
break;
}
}
});
let exit_status: Arc<Mutex<Option<ExitStatus>>> = Arc::new(Mutex::new(None));
let shared_stdin = Arc::new(Mutex::new(
Box::new(stdin) as Box<dyn AsyncWrite + Send + Unpin>
));
let process: Arc<Mutex<Option<Child>>> = Arc::new(Mutex::new(Some(child)));
Ok(RawChildIo {
stdin: shared_stdin,
stdout: Box::new(duplex_read) as Box<dyn AsyncRead + Send + Unpin>,
exit_status,
process,
idle_flag: Some(Arc::clone(&self.idle_flag)),
})
}
fn is_idle(&self) -> bool {
self.idle_flag.load(Ordering::SeqCst)
}
}
pub(crate) fn make_transport(config: &AgentMcpConfig, team: &str) -> Box<dyn CodexTransport> {
match config.transport.as_deref() {
None | Some("mcp") => Box::new(McpTransport::new(config.clone(), team)),
Some("json") => Box::new(JsonCodecTransport::new(config.clone(), team)),
Some("mock") => {
let (transport, _handle) = MockTransport::new_with_handle();
Box::new(transport)
}
Some(other) => {
tracing::warn!(
transport = %other,
"unknown transport '{}'; falling back to McpTransport",
other
);
Box::new(McpTransport::new(config.clone(), team))
}
}
}
pub struct MockTransportHandle {
pub response_tx: tokio::sync::mpsc::UnboundedSender<String>,
pub request_rx: tokio::sync::mpsc::UnboundedReceiver<String>,
}
#[derive(Debug)]
pub struct MockTransport {
#[expect(
dead_code,
reason = "keepalive: prevents the response channel from closing if the \
MockTransportHandle is dropped before spawn's background task exits"
)]
response_tx: tokio::sync::mpsc::UnboundedSender<String>,
response_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<String>>>>,
request_tx: tokio::sync::mpsc::UnboundedSender<String>,
request_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<String>>>>,
}
impl MockTransport {
pub fn new_with_handle() -> (Self, MockTransportHandle) {
let (response_tx, response_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let (request_tx, request_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let transport = Self {
response_tx: response_tx.clone(),
response_rx: Arc::new(Mutex::new(Some(response_rx))),
request_tx,
request_rx: Arc::new(Mutex::new(Some(request_rx))),
};
let req_rx = transport
.request_rx
.try_lock()
.expect("freshly created Mutex cannot be contended")
.take()
.expect("Option is Some on construction");
let handle = MockTransportHandle {
response_tx,
request_rx: req_rx,
};
(transport, handle)
}
pub async fn spawn(&self) -> anyhow::Result<RawChildIo> {
<Self as CodexTransport>::spawn(self).await
}
}
#[async_trait]
impl CodexTransport for MockTransport {
async fn spawn(&self) -> anyhow::Result<RawChildIo> {
use tokio::io::AsyncWriteExt as _;
let (mut stdout_write, stdout_read) = tokio::io::duplex(65_536);
let mut response_rx = self
.response_rx
.lock()
.await
.take()
.ok_or_else(|| anyhow::anyhow!("MockTransport::spawn called more than once"))?;
tokio::spawn(async move {
while let Some(line) = response_rx.recv().await {
let bytes = format!("{line}\n");
if stdout_write.write_all(bytes.as_bytes()).await.is_err() {
break;
}
}
});
let stdin_capturer = SniffWriter::new(self.request_tx.clone());
Ok(RawChildIo {
stdin: Arc::new(Mutex::new(
Box::new(stdin_capturer) as Box<dyn AsyncWrite + Send + Unpin>
)),
stdout: Box::new(stdout_read) as Box<dyn AsyncRead + Send + Unpin>,
exit_status: Arc::new(Mutex::new(None)),
process: Arc::new(Mutex::new(None)),
idle_flag: None,
})
}
}
struct SniffWriter {
tx: tokio::sync::mpsc::UnboundedSender<String>,
buf: Vec<u8>,
}
impl SniffWriter {
fn new(tx: tokio::sync::mpsc::UnboundedSender<String>) -> Self {
Self { tx, buf: Vec::new() }
}
}
impl AsyncWrite for SniffWriter {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.buf.extend_from_slice(buf);
while let Some(pos) = self.buf.iter().position(|&b| b == b'\n') {
let line: Vec<u8> = self.buf.drain(..=pos).collect();
let s = String::from_utf8_lossy(&line).trim().to_string();
if !s.is_empty() {
let _ = self.tx.send(s);
}
}
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn make_transport_returns_mcp_for_none() {
let config = AgentMcpConfig::default();
let _t = make_transport(&config, "default");
}
#[test]
fn make_transport_returns_mcp_for_explicit_mcp() {
let config = AgentMcpConfig {
transport: Some("mcp".to_string()),
..Default::default()
};
let _t = make_transport(&config, "test-team");
}
#[test]
fn make_transport_returns_json_codec_for_json() {
let config = AgentMcpConfig {
transport: Some("json".to_string()),
..Default::default()
};
let _t = make_transport(&config, "test-team");
}
#[test]
fn make_transport_returns_mock_transport() {
let config = AgentMcpConfig {
transport: Some("mock".to_string()),
..Default::default()
};
let _t = make_transport(&config, "test-team");
}
#[test]
fn make_transport_falls_back_for_unknown() {
let config = AgentMcpConfig {
transport: Some("unknown-transport".to_string()),
..Default::default()
};
let _t = make_transport(&config, "test-team");
}
#[test]
fn mock_transport_new_with_handle_does_not_panic() {
let (_transport, _handle) = MockTransport::new_with_handle();
}
#[test]
fn parse_event_type_detects_idle() {
assert!(matches!(
parse_event_type(r#"{"type":"idle"}"#),
TransportEventType::Idle
));
assert!(matches!(
parse_event_type(r#"{"type": "idle", "extra": 42}"#),
TransportEventType::Idle
));
}
#[test]
fn parse_event_type_detects_done() {
assert!(matches!(
parse_event_type(r#"{"type":"done"}"#),
TransportEventType::Done
));
}
#[test]
fn parse_event_type_handles_all_known_types() {
assert!(matches!(
parse_event_type(r#"{"type":"agent_message"}"#),
TransportEventType::AgentMessage
));
assert!(matches!(
parse_event_type(r#"{"type":"tool_call"}"#),
TransportEventType::ToolCall
));
assert!(matches!(
parse_event_type(r#"{"type":"tool_result"}"#),
TransportEventType::ToolResult
));
assert!(matches!(
parse_event_type(r#"{"type":"file_change"}"#),
TransportEventType::FileChange
));
}
#[test]
fn parse_event_type_returns_unknown_for_unrecognised() {
assert!(matches!(
parse_event_type(r#"{"type":"unknown_type"}"#),
TransportEventType::Unknown
));
assert!(matches!(
parse_event_type(r#"{"foo":"bar"}"#),
TransportEventType::Unknown
));
assert!(matches!(
parse_event_type("not json"),
TransportEventType::Unknown
));
assert!(matches!(
parse_event_type(""),
TransportEventType::Unknown
));
}
#[test]
fn json_codec_transport_is_idle_default_false() {
let t = JsonCodecTransport::new(AgentMcpConfig::default(), "test-team");
assert!(!t.is_idle());
}
#[test]
fn json_codec_transport_idle_flag_round_trip() {
let t = JsonCodecTransport::new(AgentMcpConfig::default(), "test-team");
assert!(!t.is_idle());
t.idle_flag.store(true, Ordering::SeqCst);
assert!(t.is_idle());
t.idle_flag.store(false, Ordering::SeqCst);
assert!(!t.is_idle());
}
}