codex-client-sdk 0.107.0

Rust SDK for embedding the Codex agent via CLI-over-JSONL transport
Documentation
mod helpers;

use std::collections::HashMap;
use std::time::Duration;

use codex::{CodexOptions, Error, TurnOptions};
use futures::StreamExt;
use serde_json::{Value, json};
use tokio_util::sync::CancellationToken;

use crate::helpers::MockCodexHarness;

fn success_events() -> Vec<Value> {
    vec![
        json!({ "type": "thread.started", "thread_id": "thread_1" }),
        json!({ "type": "turn.started" }),
        json!({
            "type": "item.completed",
            "item": { "id": "item_1", "type": "agent_message", "text": "Hi!" }
        }),
        json!({
            "type": "turn.completed",
            "usage": { "input_tokens": 42, "cached_input_tokens": 12, "output_tokens": 5 }
        }),
    ]
}

fn infinite_mode_options() -> CodexOptions {
    let mut options = CodexOptions::default();
    let mut env = HashMap::new();
    env.insert("CODEX_MOCK_INFINITE".to_string(), "1".to_string());
    env.insert("CODEX_MOCK_STREAM_DELAY_MS".to_string(), "20".to_string());
    options.env = Some(env);
    options
}

#[tokio::test]
async fn aborts_run_when_token_is_already_cancelled() {
    let harness = MockCodexHarness::new(vec![success_events()]);
    let codex = harness.codex(CodexOptions::default()).expect("codex");
    let thread = codex.start_thread(None);

    let token = CancellationToken::new();
    token.cancel();
    let error = thread
        .run(
            "Hello, world!",
            Some(TurnOptions {
                cancellation_token: Some(token),
                ..Default::default()
            }),
        )
        .await
        .expect_err("must fail");

    assert!(matches!(error, Error::Cancelled));
}

#[tokio::test]
async fn aborts_run_streamed_when_token_is_already_cancelled() {
    let harness = MockCodexHarness::new(vec![success_events()]);
    let codex = harness.codex(CodexOptions::default()).expect("codex");
    let thread = codex.start_thread(None);

    let token = CancellationToken::new();
    token.cancel();
    let error = match thread
        .run_streamed(
            "Hello, world!",
            Some(TurnOptions {
                cancellation_token: Some(token),
                ..Default::default()
            }),
        )
        .await
    {
        Ok(_) => panic!("expected cancellation error"),
        Err(error) => error,
    };

    assert!(matches!(error, Error::Cancelled));
}

#[tokio::test]
async fn aborts_run_when_token_is_cancelled_during_execution() {
    let harness = MockCodexHarness::new(vec![Vec::new()]);
    let codex = harness.codex(infinite_mode_options()).expect("codex");
    let thread = codex.start_thread(None);

    let token = CancellationToken::new();
    let token_for_task = token.clone();
    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_millis(80)).await;
        token_for_task.cancel();
    });

    let error = thread
        .run(
            "Hello, world!",
            Some(TurnOptions {
                cancellation_token: Some(token),
                ..Default::default()
            }),
        )
        .await
        .expect_err("must fail");

    assert!(matches!(error, Error::Cancelled));
}

#[tokio::test]
async fn aborts_run_streamed_when_token_is_cancelled_during_iteration() {
    let harness = MockCodexHarness::new(vec![Vec::new()]);
    let codex = harness.codex(infinite_mode_options()).expect("codex");
    let thread = codex.start_thread(None);

    let token = CancellationToken::new();
    let streamed = thread
        .run_streamed(
            "Hello, world!",
            Some(TurnOptions {
                cancellation_token: Some(token.clone()),
                ..Default::default()
            }),
        )
        .await
        .expect("streamed");

    let mut events = streamed.events;
    let mut seen = 0usize;
    loop {
        let next = events.next().await.expect("stream should continue");
        match next {
            Ok(_) => {
                seen += 1;
                if seen == 5 {
                    token.cancel();
                }
            }
            Err(error) => {
                assert!(matches!(error, Error::Cancelled));
                break;
            }
        }
    }
}

#[tokio::test]
async fn completes_normally_when_token_is_not_cancelled() {
    let harness = MockCodexHarness::new(vec![success_events()]);
    let codex = harness.codex(CodexOptions::default()).expect("codex");
    let thread = codex.start_thread(None);

    let token = CancellationToken::new();
    let result = thread
        .run(
            "Hello, world!",
            Some(TurnOptions {
                cancellation_token: Some(token),
                ..Default::default()
            }),
        )
        .await
        .expect("run");

    assert_eq!(result.final_response, "Hi!");
    assert_eq!(result.items.len(), 1);
}