codex-client-sdk 0.107.0

Rust SDK for embedding the Codex agent via CLI-over-JSONL transport
Documentation
mod helpers;

use codex::{CodexOptions, ThreadEvent, TurnOptions};
use futures::StreamExt;
use serde_json::{Value, json};

use crate::helpers::MockCodexHarness;

fn success_events(thread_id: Option<&str>, text: &str, item_id: &str) -> Vec<Value> {
    let mut events = Vec::new();
    if let Some(thread_id) = thread_id {
        events.push(json!({
            "type": "thread.started",
            "thread_id": thread_id
        }));
    }
    events.push(json!({ "type": "turn.started" }));
    events.push(json!({
        "type": "item.completed",
        "item": {
            "id": item_id,
            "type": "agent_message",
            "text": text
        }
    }));
    events.push(json!({
        "type": "turn.completed",
        "usage": {
            "input_tokens": 42,
            "cached_input_tokens": 12,
            "output_tokens": 5
        }
    }));
    events
}

#[tokio::test]
async fn returns_thread_events() {
    let harness = MockCodexHarness::new(vec![success_events(Some("thread_1"), "Hi!", "item_1")]);
    let codex = harness.codex(CodexOptions::default()).expect("codex");
    let thread = codex.start_thread(None);

    let streamed = thread
        .run_streamed("Hello, world!", None)
        .await
        .expect("streamed");

    let mut events = Vec::new();
    let mut stream = streamed.events;
    while let Some(event) = stream.next().await {
        events.push(event.expect("event"));
    }

    assert_eq!(
        events,
        vec![
            ThreadEvent::ThreadStarted {
                thread_id: "thread_1".to_string()
            },
            ThreadEvent::TurnStarted,
            ThreadEvent::ItemCompleted {
                item: codex::ThreadItem::AgentMessage(codex::AgentMessageItem {
                    id: "item_1".to_string(),
                    text: "Hi!".to_string()
                })
            },
            ThreadEvent::TurnCompleted {
                usage: codex::Usage {
                    input_tokens: 42,
                    cached_input_tokens: 12,
                    output_tokens: 5
                }
            }
        ]
    );
    assert_eq!(thread.id().as_deref(), Some("thread_1"));
}

#[tokio::test]
async fn sends_resume_when_run_streamed_is_called_twice() {
    let harness = MockCodexHarness::new(vec![
        success_events(Some("thread_1"), "First response", "item_1"),
        success_events(None, "Second response", "item_2"),
    ]);
    let codex = harness.codex(CodexOptions::default()).expect("codex");
    let thread = codex.start_thread(None);

    let mut first = thread
        .run_streamed("first input", None)
        .await
        .expect("first")
        .events;
    while first.next().await.is_some() {}

    let mut second = thread
        .run_streamed("second input", None)
        .await
        .expect("second")
        .events;
    while second.next().await.is_some() {}

    let logs = harness.logs();
    assert_eq!(logs.len(), 2);
    assert!(
        logs[1]
            .args
            .windows(2)
            .any(|window| window[0] == "resume" && window[1] == "thread_1")
    );
}

#[tokio::test]
async fn resumes_thread_by_id_when_streaming() {
    let harness = MockCodexHarness::new(vec![
        success_events(Some("thread_1"), "First response", "item_1"),
        success_events(None, "Second response", "item_2"),
    ]);
    let codex = harness.codex(CodexOptions::default()).expect("codex");

    let original = codex.start_thread(None);
    let mut first = original
        .run_streamed("first input", None)
        .await
        .expect("first")
        .events;
    while first.next().await.is_some() {}

    let resumed = codex.resume_thread(original.id().expect("id"), None);
    let mut second = resumed
        .run_streamed("second input", None)
        .await
        .expect("second")
        .events;
    while second.next().await.is_some() {}

    assert_eq!(resumed.id(), original.id());
    let logs = harness.logs();
    assert!(
        logs[1]
            .args
            .windows(2)
            .any(|window| window[0] == "resume" && window[1] == "thread_1")
    );
}

#[tokio::test]
async fn applies_output_schema_turn_options_when_streaming() {
    let harness = MockCodexHarness::new(vec![success_events(
        Some("thread_1"),
        "Structured",
        "item_1",
    )]);
    let codex = harness.codex(CodexOptions::default()).expect("codex");
    let thread = codex.start_thread(None);

    let schema = json!({
        "type": "object",
        "properties": { "answer": { "type": "string" } },
        "required": ["answer"],
        "additionalProperties": false
    });

    let mut events = thread
        .run_streamed(
            "structured",
            Some(TurnOptions {
                output_schema: Some(schema.clone()),
                ..Default::default()
            }),
        )
        .await
        .expect("streamed")
        .events;
    while events.next().await.is_some() {}

    let logs = harness.logs();
    assert_eq!(logs[0].output_schema, Some(schema));
}