unified-agent-api 0.3.5

Agent-agnostic facade and registry for wrapper backends
Documentation
#![cfg(feature = "codex")]

use std::{
    collections::BTreeMap,
    path::PathBuf,
    pin::Pin,
    time::{Duration, SystemTime, UNIX_EPOCH},
};

use agent_api::{
    backends::codex::{CodexBackend, CodexBackendConfig},
    AgentWrapperBackend, AgentWrapperEvent, AgentWrapperEventKind, AgentWrapperRunRequest,
};
use futures_core::Stream;

fn fake_codex_binary() -> PathBuf {
    PathBuf::from(env!("CARGO_BIN_EXE_fake_codex_stream_json_agent_api"))
}

fn any_event_contains(events: &[AgentWrapperEvent], needle: &str) -> bool {
    events.iter().any(|ev| {
        ev.message
            .as_deref()
            .is_some_and(|message| message.contains(needle))
            || ev.text.as_deref().is_some_and(|text| text.contains(needle))
    })
}

async fn drain_to_none(
    mut stream: Pin<&mut (dyn Stream<Item = AgentWrapperEvent> + Send)>,
    timeout: Duration,
) -> Vec<AgentWrapperEvent> {
    let mut out = Vec::new();
    let deadline = tokio::time::sleep(timeout);
    tokio::pin!(deadline);

    loop {
        tokio::select! {
            _ = &mut deadline => break,
            item = std::future::poll_fn(|cx| stream.as_mut().poll_next(cx)) => {
                match item {
                    Some(ev) => out.push(ev),
                    None => break,
                }
            },
        }
    }

    out
}

fn unique_temp_path(stem: &str) -> PathBuf {
    let nanos = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_else(|_| Duration::from_secs(0))
        .as_nanos();
    let mut path = std::env::temp_dir();
    path.push("agent_api");
    path.push("c2");
    path.push(format!("{stem}_{}_{}.txt", std::process::id(), nanos));
    path
}

#[tokio::test]
async fn events_are_observable_before_process_exit() {
    let backend = CodexBackend::new(CodexBackendConfig {
        binary: Some(fake_codex_binary()),
        env: [(
            "FAKE_CODEX_SCENARIO".to_string(),
            "live_two_events_long_delay".to_string(),
        )]
        .into_iter()
        .collect(),
        ..Default::default()
    });

    let handle = backend
        .run(AgentWrapperRunRequest {
            prompt: "hello".to_string(),
            ..Default::default()
        })
        .await
        .unwrap();

    let mut events = handle.events;
    let mut completion = handle.completion;

    let first_event = tokio::time::timeout(
        Duration::from_secs(1),
        std::future::poll_fn(|cx| events.as_mut().poll_next(cx)),
    )
    .await
    .expect("first event arrives")
    .expect("stream yields an event");

    assert_eq!(
        first_event.kind,
        AgentWrapperEventKind::Status,
        "expected a live status event before process exit"
    );

    drop(events);

    assert!(
        tokio::time::timeout(Duration::from_millis(200), &mut completion)
            .await
            .is_err(),
        "expected completion to remain pending after observing the first event"
    );

    let completion = tokio::time::timeout(Duration::from_secs(3), &mut completion)
        .await
        .expect("completion resolves")
        .unwrap();
    assert!(completion.status.success());
}

#[tokio::test]
async fn request_env_overrides_backend_env() {
    let dump_path = unique_temp_path("env_dump");
    let dump_path_str = dump_path.to_string_lossy().to_string();

    let backend = CodexBackend::new(CodexBackendConfig {
        binary: Some(fake_codex_binary()),
        env: [
            (
                "FAKE_CODEX_SCENARIO".to_string(),
                "dump_env_then_exit".to_string(),
            ),
            ("CODEX_WRAPPER_TEST_DUMP_ENV".to_string(), dump_path_str),
            ("C2_TEST_KEY".to_string(), "config".to_string()),
            ("C2_ONLY_CONFIG".to_string(), "config-only".to_string()),
        ]
        .into_iter()
        .collect(),
        ..Default::default()
    });

    let handle = backend
        .run(AgentWrapperRunRequest {
            prompt: "hello".to_string(),
            env: [("C2_TEST_KEY".to_string(), "request".to_string())]
                .into_iter()
                .collect::<BTreeMap<_, _>>(),
            ..Default::default()
        })
        .await
        .unwrap();

    drop(handle.events);

    let completion = tokio::time::timeout(Duration::from_secs(2), handle.completion)
        .await
        .expect("completion resolves")
        .unwrap();
    assert!(completion.status.success());

    let dump = std::fs::read_to_string(&dump_path).expect("dump file exists");
    let mut vars = BTreeMap::<String, String>::new();
    for line in dump.lines() {
        let Some((key, value)) = line.split_once('=') else {
            continue;
        };
        vars.insert(key.to_string(), value.to_string());
    }

    assert_eq!(vars.get("C2_TEST_KEY").map(String::as_str), Some("request"));
    assert_eq!(
        vars.get("C2_ONLY_CONFIG").map(String::as_str),
        Some("config-only")
    );
}

#[tokio::test]
async fn redaction_does_not_leak_raw_jsonl_line() {
    let backend = CodexBackend::new(CodexBackendConfig {
        binary: Some(fake_codex_binary()),
        env: [(
            "FAKE_CODEX_SCENARIO".to_string(),
            "emit_normalize_error_with_rawline_secret".to_string(),
        )]
        .into_iter()
        .collect(),
        ..Default::default()
    });

    let handle = backend
        .run(AgentWrapperRunRequest {
            prompt: "hello".to_string(),
            ..Default::default()
        })
        .await
        .unwrap();

    let mut events = handle.events;
    let completion = handle.completion;

    let seen = drain_to_none(events.as_mut(), Duration::from_secs(2)).await;
    assert!(
        seen.iter()
            .any(|ev| ev.kind == AgentWrapperEventKind::Error),
        "expected at least one redacted Error event"
    );
    assert!(
        !any_event_contains(&seen, "RAWLINE_SECRET_DO_NOT_LEAK"),
        "expected redaction to avoid raw JSONL line content"
    );

    let completion = tokio::time::timeout(Duration::from_secs(2), completion)
        .await
        .expect("completion resolves")
        .unwrap();
    assert!(completion.status.success());
}