#![cfg(feature = "integration-tests")]
use codex_codes::{
AppServerBuilder, AsyncClient, ClientInfo, InitializeCapabilities, InitializeParams,
Notification, ServerMessage, ServerRequest, SyncClient, ThreadStartParams, TurnStartParams,
UserInput,
};
#[tokio::test]
async fn test_codex_cli_version() {
codex_codes::version::check_codex_version_async()
.await
.expect("Failed to check Codex CLI version");
}
#[tokio::test]
async fn test_async_client_start_and_thread_start() {
let mut client = AsyncClient::start()
.await
.expect("Failed to start app-server");
let thread = client
.thread_start(&serde_json::from_value::<ThreadStartParams>(serde_json::json!({})).unwrap())
.await
.expect("Failed to start thread");
assert!(!thread.thread.id.is_empty(), "thread_id must not be empty");
client.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
async fn test_async_client_basic_turn() {
let mut client = AsyncClient::start()
.await
.expect("Failed to start app-server");
let thread = client
.thread_start(&serde_json::from_value::<ThreadStartParams>(serde_json::json!({})).unwrap())
.await
.expect("Failed to start thread");
client
.turn_start(&TurnStartParams {
thread_id: thread.thread.id.clone(),
input: vec![UserInput::Text {
text: "What is 2 + 2? Reply with just the number.".to_string(),
text_elements: None,
}],
approval_policy: None,
approvals_reviewer: None,
cwd: None,
effort: None,
model: None,
output_schema: None,
personality: None,
sandbox_policy: None,
service_tier: None,
summary: None,
})
.await
.expect("Failed to start turn");
let mut found_answer = false;
let mut turn_completed = false;
let mut message_count = 0;
while let Some(msg) = client.next_message().await.expect("Failed to read message") {
message_count += 1;
match msg {
ServerMessage::Notification(Notification::AgentMessageDelta(d)) => {
if d.delta.contains('4') {
found_answer = true;
}
}
ServerMessage::Notification(Notification::TurnCompleted(_)) => {
turn_completed = true;
break;
}
ServerMessage::Notification(_) => {}
ServerMessage::Request { id, .. } => {
client
.respond(id, &serde_json::json!({"decision": "accept"}))
.await
.expect("Failed to respond");
}
}
if message_count > 100 {
break;
}
}
assert!(turn_completed, "Turn should have completed");
assert!(found_answer, "Response should contain '4'");
client.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
async fn test_async_client_custom_initialize() {
let mut client = AsyncClient::spawn(AppServerBuilder::new())
.await
.expect("Failed to spawn app-server");
let resp = client
.initialize(&InitializeParams {
client_info: ClientInfo {
name: "codex-codes-test".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
title: Some("Integration Test".to_string()),
},
capabilities: Some(InitializeCapabilities {
experimental_api: Some(false),
opt_out_notification_methods: None,
request_attestation: None,
}),
})
.await
.expect("Failed to initialize");
assert!(
!resp.user_agent.is_empty(),
"user_agent should not be empty"
);
let thread = client
.thread_start(&serde_json::from_value::<ThreadStartParams>(serde_json::json!({})).unwrap())
.await
.expect("Failed to start thread after custom init");
assert!(!thread.thread.id.is_empty());
client.shutdown().await.expect("Failed to shutdown");
}
#[test]
fn test_sync_client_start_and_thread_start() {
let mut client = SyncClient::start().expect("Failed to start app-server");
let thread = client
.thread_start(&serde_json::from_value::<ThreadStartParams>(serde_json::json!({})).unwrap())
.expect("Failed to start thread");
assert!(!thread.thread.id.is_empty(), "thread_id must not be empty");
}
#[test]
fn test_sync_client_basic_turn() {
let mut client = SyncClient::start().expect("Failed to start app-server");
let thread = client
.thread_start(&serde_json::from_value::<ThreadStartParams>(serde_json::json!({})).unwrap())
.expect("Failed to start thread");
client
.turn_start(&TurnStartParams {
thread_id: thread.thread.id.clone(),
input: vec![UserInput::Text {
text: "What is 2 + 2? Reply with just the number.".to_string(),
text_elements: None,
}],
approval_policy: None,
approvals_reviewer: None,
cwd: None,
effort: None,
model: None,
output_schema: None,
personality: None,
sandbox_policy: None,
service_tier: None,
summary: None,
})
.expect("Failed to start turn");
let mut found_answer = false;
let mut turn_completed = false;
let mut message_count = 0;
for result in client.events() {
let msg = result.expect("Failed to read message");
message_count += 1;
match msg {
ServerMessage::Notification(Notification::AgentMessageDelta(d)) => {
if d.delta.contains('4') {
found_answer = true;
}
}
ServerMessage::Notification(Notification::TurnCompleted(_)) => {
turn_completed = true;
break;
}
ServerMessage::Notification(_) | ServerMessage::Request { .. } => {}
}
if message_count > 100 {
break;
}
}
assert!(turn_completed, "Turn should have completed");
assert!(found_answer, "Response should contain '4'");
}
#[tokio::test]
async fn test_async_client_multi_turn() {
let mut client = AsyncClient::start()
.await
.expect("Failed to start app-server");
let thread = client
.thread_start(&serde_json::from_value::<ThreadStartParams>(serde_json::json!({})).unwrap())
.await
.expect("Failed to start thread");
client
.turn_start(&TurnStartParams {
thread_id: thread.thread.id.clone(),
input: vec![UserInput::Text {
text: "Remember the number 42. Just say OK.".to_string(),
text_elements: None,
}],
approval_policy: None,
approvals_reviewer: None,
cwd: None,
effort: None,
model: None,
output_schema: None,
personality: None,
sandbox_policy: None,
service_tier: None,
summary: None,
})
.await
.expect("Failed to start first turn");
let mut message_count = 0;
while let Some(msg) = client.next_message().await.expect("read") {
message_count += 1;
match msg {
ServerMessage::Notification(Notification::TurnCompleted(_)) => break,
ServerMessage::Notification(_) => {}
ServerMessage::Request { id, .. } => {
client
.respond(id, &serde_json::json!({"decision": "accept"}))
.await
.ok();
}
}
if message_count > 100 {
break;
}
}
client
.turn_start(&TurnStartParams {
thread_id: thread.thread.id.clone(),
input: vec![UserInput::Text {
text: "What number did I ask you to remember? Reply with just the number."
.to_string(),
text_elements: None,
}],
approval_policy: None,
approvals_reviewer: None,
cwd: None,
effort: None,
model: None,
output_schema: None,
personality: None,
sandbox_policy: None,
service_tier: None,
summary: None,
})
.await
.expect("Failed to start second turn");
let mut found_42 = false;
let mut message_count = 0;
while let Some(msg) = client.next_message().await.expect("read") {
message_count += 1;
match msg {
ServerMessage::Notification(Notification::AgentMessageDelta(d)) => {
if d.delta.contains("42") {
found_42 = true;
}
}
ServerMessage::Notification(Notification::TurnCompleted(_)) => break,
ServerMessage::Notification(_) => {}
ServerMessage::Request { id, .. } => {
client
.respond(id, &serde_json::json!({"decision": "accept"}))
.await
.ok();
}
}
if message_count > 100 {
break;
}
}
assert!(found_42, "Agent should remember 42 from the first turn");
client.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
async fn test_async_client_event_stream() {
let mut client = AsyncClient::start()
.await
.expect("Failed to start app-server");
let thread = client
.thread_start(&serde_json::from_value::<ThreadStartParams>(serde_json::json!({})).unwrap())
.await
.expect("Failed to start thread");
client
.turn_start(&TurnStartParams {
thread_id: thread.thread.id.clone(),
input: vec![UserInput::Text {
text: "Say hello.".to_string(),
text_elements: None,
}],
approval_policy: None,
approvals_reviewer: None,
cwd: None,
effort: None,
model: None,
output_schema: None,
personality: None,
sandbox_policy: None,
service_tier: None,
summary: None,
})
.await
.expect("Failed to start turn");
let mut stream = client.events();
let mut got_turn_started = false;
let mut got_turn_completed = false;
let mut message_count = 0;
while let Some(result) = stream.next().await {
let msg = result.expect("Failed to read event");
message_count += 1;
if let ServerMessage::Notification(n) = &msg {
match n {
Notification::TurnStarted(_) => got_turn_started = true,
Notification::TurnCompleted(_) => {
got_turn_completed = true;
break;
}
_ => {}
}
}
if message_count > 100 {
break;
}
}
assert!(got_turn_started, "Should have received turn/started");
assert!(got_turn_completed, "Should have received turn/completed");
}
#[tokio::test]
async fn test_typed_message_audit_strict() {
use std::collections::BTreeMap;
let mut client = AsyncClient::start()
.await
.expect("Failed to start app-server");
let thread = client
.thread_start(&serde_json::from_value::<ThreadStartParams>(serde_json::json!({})).unwrap())
.await
.expect("Failed to start thread");
client
.turn_start(&TurnStartParams {
thread_id: thread.thread.id.clone(),
input: vec![UserInput::Text {
text: "Run `ls` in the current directory, then briefly describe what you saw."
.to_string(),
text_elements: None,
}],
approval_policy: None,
approvals_reviewer: None,
cwd: None,
effort: None,
model: None,
output_schema: None,
personality: None,
sandbox_policy: None,
service_tier: None,
summary: None,
})
.await
.expect("Failed to start turn");
let mut typed_counts: BTreeMap<&'static str, u32> = BTreeMap::new();
let mut unknown_methods: BTreeMap<String, u32> = BTreeMap::new();
let mut message_count = 0;
while let Some(msg) = client.next_message().await.expect("read") {
message_count += 1;
match msg {
ServerMessage::Notification(n) => {
let variant = match &n {
Notification::ThreadStarted(_) => "ThreadStarted",
Notification::ThreadStatusChanged(_) => "ThreadStatusChanged",
Notification::ThreadTokenUsageUpdated(_) => "ThreadTokenUsageUpdated",
Notification::TurnStarted(_) => "TurnStarted",
Notification::TurnCompleted(_) => "TurnCompleted",
Notification::ItemStarted(_) => "ItemStarted",
Notification::ItemCompleted(_) => "ItemCompleted",
Notification::AgentMessageDelta(_) => "AgentMessageDelta",
Notification::CmdOutputDelta(_) => "CmdOutputDelta",
Notification::FileChangeOutputDelta(_) => "FileChangeOutputDelta",
Notification::ReasoningDelta(_) => "ReasoningDelta",
Notification::Error(_) => "Error",
Notification::AccountRateLimitsUpdated(_) => "AccountRateLimitsUpdated",
Notification::McpServerStartupStatusUpdated(_) => {
"McpServerStartupStatusUpdated"
}
Notification::RemoteControlStatusChanged(_) => "RemoteControlStatusChanged",
Notification::McpServerOauthLoginCompleted(_) => "McpServerOauthLoginCompleted",
Notification::FileChangePatchUpdated(_) => "FileChangePatchUpdated",
Notification::PlanDelta(_) => "PlanDelta",
Notification::TurnPlanUpdated(_) => "TurnPlanUpdated",
Notification::TurnDiffUpdated(_) => "TurnDiffUpdated",
Notification::ReasoningSummaryPartAdded(_) => "ReasoningSummaryPartAdded",
Notification::ReasoningTextDelta(_) => "ReasoningTextDelta",
Notification::AccountLoginCompleted(_) => "AccountLoginCompleted",
Notification::DeprecationNotice(_) => "DeprecationNotice",
Notification::GuardianWarning(_) => "GuardianWarning",
Notification::Warning(_) => "Warning",
Notification::ThreadArchived(_) => "ThreadArchived",
Notification::ThreadClosed(_) => "ThreadClosed",
Notification::ThreadUnarchived(_) => "ThreadUnarchived",
Notification::ThreadGoalCleared(_) => "ThreadGoalCleared",
Notification::ThreadNameUpdated(_) => "ThreadNameUpdated",
Notification::SkillsChanged(_) => "SkillsChanged",
Notification::FsChanged(_) => "FsChanged",
Notification::ConfigWarning(_) => "ConfigWarning",
Notification::AccountUpdated(_) => "AccountUpdated",
Notification::AppListUpdated(_) => "AppListUpdated",
Notification::CommandExecOutputDelta(_) => "CommandExecOutputDelta",
Notification::ExternalAgentConfigImportCompleted(_) => {
"ExternalAgentConfigImportCompleted"
}
Notification::FuzzyFileSearchSessionCompleted(_) => {
"FuzzyFileSearchSessionCompleted"
}
Notification::FuzzyFileSearchSessionUpdated(_) => {
"FuzzyFileSearchSessionUpdated"
}
Notification::HookCompleted(_) => "HookCompleted",
Notification::HookStarted(_) => "HookStarted",
Notification::ItemGuardianApprovalReviewCompleted(_) => {
"ItemGuardianApprovalReviewCompleted"
}
Notification::ItemGuardianApprovalReviewStarted(_) => {
"ItemGuardianApprovalReviewStarted"
}
Notification::TerminalInteraction(_) => "TerminalInteraction",
Notification::McpToolCallProgress(_) => "McpToolCallProgress",
Notification::ModelRerouted(_) => "ModelRerouted",
Notification::ModelVerification(_) => "ModelVerification",
Notification::ProcessExited(_) => "ProcessExited",
Notification::ProcessOutputDelta(_) => "ProcessOutputDelta",
Notification::ServerRequestResolved(_) => "ServerRequestResolved",
Notification::ContextCompacted(_) => "ContextCompacted",
Notification::ThreadGoalUpdated(_) => "ThreadGoalUpdated",
Notification::ThreadRealtimeClosed(_) => "ThreadRealtimeClosed",
Notification::ThreadRealtimeError(_) => "ThreadRealtimeError",
Notification::ThreadRealtimeItemAdded(_) => "ThreadRealtimeItemAdded",
Notification::ThreadRealtimeOutputAudioDelta(_) => {
"ThreadRealtimeOutputAudioDelta"
}
Notification::ThreadRealtimeSdp(_) => "ThreadRealtimeSdp",
Notification::ThreadRealtimeStarted(_) => "ThreadRealtimeStarted",
Notification::ThreadRealtimeTranscriptDelta(_) => {
"ThreadRealtimeTranscriptDelta"
}
Notification::ThreadRealtimeTranscriptDone(_) => "ThreadRealtimeTranscriptDone",
Notification::WindowsWorldWritableWarning(_) => "WindowsWorldWritableWarning",
Notification::WindowsSandboxSetupCompleted(_) => "WindowsSandboxSetupCompleted",
Notification::Unknown { method, .. } => {
*unknown_methods.entry(method.clone()).or_insert(0) += 1;
continue;
}
};
*typed_counts.entry(variant).or_insert(0) += 1;
if matches!(&n, Notification::TurnCompleted(_)) {
break;
}
}
ServerMessage::Request { id, request } => {
if request.is_unknown() {
unknown_methods
.entry(request.method().to_string())
.and_modify(|c| *c += 1)
.or_insert(1);
}
client
.respond(id, &serde_json::json!({"decision": "accept"}))
.await
.expect("respond");
}
}
if message_count > 500 {
break;
}
}
eprintln!("\n── Typed message audit ──────────────────────────────────");
eprintln!("Typed variants seen ({} kinds):", typed_counts.len());
for (variant, n) in &typed_counts {
eprintln!(" {:4}× Notification::{}", n, variant);
}
eprintln!("─────────────────────────────────────────────────────────\n");
assert!(
unknown_methods.is_empty(),
"Wire methods with no typed binding (audit must be empty): {:?}",
unknown_methods
);
client.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn test_async_client_writes_compilable_quicksort() {
use std::process::Command;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::time::timeout;
let _ = env_logger::builder().is_test(true).try_init();
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let scratch = std::env::temp_dir().join(format!("codex-codes-quicksort-{nonce}"));
let _ = std::fs::remove_dir_all(&scratch);
std::fs::create_dir_all(&scratch).expect("create scratch dir");
let git_init = Command::new("git")
.args(["init", "-q"])
.current_dir(&scratch)
.status()
.expect("spawn git init");
assert!(
git_init.success(),
"git init failed in {}",
scratch.display()
);
let mut client = AsyncClient::spawn(AppServerBuilder::new().working_directory(&scratch))
.await
.expect("spawn app-server");
client
.initialize(&InitializeParams {
client_info: ClientInfo {
name: "codex-codes-test".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
title: Some("Quicksort Integration".to_string()),
},
capabilities: None,
})
.await
.expect("initialize");
let thread = client
.thread_start(&serde_json::from_value::<ThreadStartParams>(serde_json::json!({})).unwrap())
.await
.expect("thread_start");
let prompt = "Create a file named `quicksort.rs` in the current working directory \
containing a self-contained Rust program: an in-place `quicksort` function over \
`[i32]`, plus a `main` that sorts a small literal slice and prints the result. \
Do not create a `Cargo.toml` and do not use any external crates. The file MUST \
compile with: `rustc --edition 2021 quicksort.rs -o /dev/null`.";
client
.turn_start(&TurnStartParams {
thread_id: thread.thread.id.clone(),
input: vec![UserInput::Text {
text: prompt.to_string(),
text_elements: None,
}],
approval_policy: None,
approvals_reviewer: None,
cwd: None,
effort: None,
model: None,
output_schema: None,
personality: None,
sandbox_policy: None,
service_tier: None,
summary: None,
})
.await
.expect("turn_start");
let per_message = Duration::from_secs(90);
let mut approvals_seen = 0u32;
let mut message_count = 0u32;
let mut last_approval: Option<String> = None;
loop {
let next = timeout(per_message, client.next_message()).await;
let msg = match next {
Ok(Ok(Some(m))) => m,
Ok(Ok(None)) => panic!(
"app-server stream ended before TurnCompleted \
(messages={message_count}, approvals={approvals_seen}, last_approval={:?})",
last_approval
),
Ok(Err(e)) => panic!("read error after {message_count} messages: {e}"),
Err(_) => panic!(
"no message for {}s — possible hang \
(messages={message_count}, approvals={approvals_seen}, last_approval={:?})",
per_message.as_secs(),
last_approval
),
};
message_count += 1;
match msg {
ServerMessage::Notification(Notification::TurnCompleted(_)) => break,
ServerMessage::Notification(_) => {}
ServerMessage::Request { id, request } => {
approvals_seen += 1;
let summary = match &request {
ServerRequest::CmdExecApproval(p) => format!(
"cmdExec item={} cwd={:?} cmd={:?} reason={:?}",
p.item_id, p.cwd, p.command, p.reason
),
ServerRequest::FileChangeApproval(p) => {
format!("fileChange item={} reason={:?}", p.item_id, p.reason)
}
ServerRequest::Unknown { method, .. } => format!("unknown method={method}"),
other => format!("{} (unhandled)", other.method()),
};
eprintln!("[approval #{approvals_seen}] {summary}");
last_approval = Some(summary);
client
.respond(id, &serde_json::json!({"decision": "accept"}))
.await
.expect("respond to approval");
}
}
if message_count > 5000 {
panic!(
"too many messages without TurnCompleted (count={message_count}, \
approvals={approvals_seen})"
);
}
}
client.shutdown().await.expect("shutdown");
let written = scratch.join("quicksort.rs");
assert!(
written.exists(),
"quicksort.rs not written to {}",
scratch.display()
);
let out = scratch.join("quicksort_bin");
let compile = Command::new("rustc")
.args(["--edition", "2021", "quicksort.rs", "-o"])
.arg(&out)
.current_dir(&scratch)
.output()
.expect("spawn rustc");
assert!(
compile.status.success(),
"rustc failed for {}:\n--- stderr ---\n{}\n--- stdout ---\n{}",
written.display(),
String::from_utf8_lossy(&compile.stderr),
String::from_utf8_lossy(&compile.stdout)
);
let _ = std::fs::remove_dir_all(&scratch);
}