unified-agent-api 0.2.2

Agent-agnostic facade and registry for wrapper backends
Documentation
use super::*;

use super::support::ControlledEndStream;

#[tokio::test]
async fn cancellation_closes_events_but_does_not_accelerate_completion() {
    let adapter = std::sync::Arc::new(ToyAdapter { fail_spawn: false });
    let cancel = HarnessCancelSignal::new();

    let (backend_done_tx, backend_done_rx) = oneshot::channel::<()>();
    let completion: DynBackendCompletionFuture<_, _> = Box::pin(async move {
        backend_done_rx.await.expect("backend completion released");
        Ok::<ToyCompletion, ToyBackendError>(ToyCompletion)
    });

    let (events_finish_tx, events_finish_rx) = oneshot::channel::<()>();
    let events = ControlledEndStream::<ToyEvent, ToyBackendError> {
        first: Some(Ok::<ToyEvent, ToyBackendError>(ToyEvent::Text(
            "one".to_string(),
        ))),
        finish_rx: events_finish_rx,
    };
    let events: DynBackendEventStream<_, _> = Box::pin(events);

    let (tx, rx) = mpsc::channel::<crate::AgentWrapperEvent>(8);
    let (completion_tx, completion_rx) =
        oneshot::channel::<Result<AgentWrapperCompletion, AgentWrapperError>>();

    let pump_task = tokio::spawn(pump_backend_events_with_cancel(
        adapter.clone(),
        events,
        tx,
        cancel.clone(),
        None,
    ));
    let completion_task = tokio::spawn(send_completion_with_cancel(
        adapter,
        completion,
        cancel.clone(),
        None,
        completion_tx,
    ));

    let mut handle = crate::run_handle_gate::build_gated_run_handle(rx, completion_rx);

    let first = handle.events.next().await.expect("first event forwarded");
    assert_eq!(first.kind, AgentWrapperEventKind::TextOutput);

    cancel.cancel();

    assert!(
        tokio::time::timeout(Duration::from_secs(1), handle.events.next())
            .await
            .expect("events do not hang")
            .is_none(),
        "events stream must be closed after cancellation"
    );

    {
        let waker = noop_waker();
        let mut cx = Context::from_waker(&waker);
        assert!(matches!(
            handle.completion.as_mut().poll(&mut cx),
            Poll::Pending
        ));
    }

    let _ = backend_done_tx.send(());
    let outcome = handle.completion.await;
    assert!(
        matches!(
            outcome,
            Err(AgentWrapperError::Backend { ref message }) if message == "cancelled"
        ),
        "completion must resolve to the pinned cancelled error after backend exit"
    );

    let _ = events_finish_tx.send(());
    pump_task.await.expect("pump task completes");
    completion_task.await.expect("completion task completes");
}