unified-agent-api 0.2.0

Agent-agnostic facade and registry for wrapper backends
Documentation
use super::*;

use super::support::{ControlledEndStream, CountingStream};

#[tokio::test]
async fn pump_backend_events_smoke_forwards_in_order() {
    let adapter = std::sync::Arc::new(ToyAdapter { fail_spawn: false });
    let events = futures_util::stream::iter([
        Ok::<ToyEvent, ToyBackendError>(ToyEvent::Text("one".to_string())),
        Ok::<ToyEvent, ToyBackendError>(ToyEvent::Text("two".to_string())),
    ]);
    let events: DynBackendEventStream<_, _> = Box::pin(events);

    let (tx, mut rx) = mpsc::channel::<AgentWrapperEvent>(DEFAULT_EVENT_CHANNEL_CAPACITY);
    let handle = tokio::spawn(pump_backend_events(adapter, events, tx, None));

    let mut texts = Vec::<String>::new();
    while let Some(ev) = rx.recv().await {
        if ev.kind == AgentWrapperEventKind::TextOutput {
            if let Some(text) = ev.text {
                texts.push(text);
            }
        }
    }

    handle.await.expect("pump task completes");
    assert_eq!(texts, vec!["one".to_string(), "two".to_string()]);
}

#[tokio::test]
async fn pump_blocks_under_backpressure_until_receiver_polls() {
    #[derive(Default)]
    struct BackpressureAdapter {
        call_count: AtomicUsize,
        second_mapped_tx: Mutex<Option<oneshot::Sender<()>>>,
    }

    impl BackendHarnessAdapter for BackpressureAdapter {
        fn kind(&self) -> crate::AgentWrapperKind {
            toy_kind()
        }

        fn supported_extension_keys(&self) -> &'static [&'static str] {
            &[]
        }

        type Policy = ();

        fn validate_and_extract_policy(
            &self,
            _request: &crate::AgentWrapperRunRequest,
        ) -> Result<Self::Policy, crate::AgentWrapperError> {
            Ok(())
        }

        type BackendEvent = String;
        type BackendCompletion = ();
        type BackendError = ();

        fn spawn(
            &self,
            _req: contract::NormalizedRequest<Self::Policy>,
        ) -> Pin<
            Box<
                dyn Future<
                        Output = Result<
                            contract::BackendSpawn<
                                Self::BackendEvent,
                                Self::BackendCompletion,
                                Self::BackendError,
                            >,
                            Self::BackendError,
                        >,
                    > + Send
                    + 'static,
            >,
        > {
            panic!("spawn unused in pump tests");
        }

        fn map_event(&self, event: Self::BackendEvent) -> Vec<crate::AgentWrapperEvent> {
            let mapped = self.call_count.fetch_add(1, Ordering::SeqCst);
            if mapped == 1 {
                if let Some(tx) = self.second_mapped_tx.lock().unwrap().take() {
                    let _ = tx.send(());
                }
            }

            vec![crate::AgentWrapperEvent {
                agent_kind: toy_kind(),
                kind: AgentWrapperEventKind::TextOutput,
                channel: Some("assistant".to_string()),
                text: Some(event),
                message: None,
                data: None,
            }]
        }

        fn map_completion(
            &self,
            _completion: Self::BackendCompletion,
        ) -> Result<crate::AgentWrapperCompletion, crate::AgentWrapperError> {
            panic!("map_completion unused in pump tests");
        }

        fn redact_error(
            &self,
            _phase: BackendHarnessErrorPhase,
            _err: &Self::BackendError,
        ) -> String {
            "unused".to_string()
        }
    }

    let (second_mapped_tx, second_mapped_rx) = oneshot::channel::<()>();
    let adapter = std::sync::Arc::new(BackpressureAdapter {
        call_count: AtomicUsize::new(0),
        second_mapped_tx: Mutex::new(Some(second_mapped_tx)),
    });

    let events = futures_util::stream::iter([
        Ok::<String, ()>("one".to_string()),
        Ok::<String, ()>("two".to_string()),
    ]);
    let events: DynBackendEventStream<_, _> = Box::pin(events);

    let (tx, mut rx) = mpsc::channel::<crate::AgentWrapperEvent>(1);
    let handle = tokio::spawn(pump_backend_events(adapter, events, tx, None));

    second_mapped_rx.await.expect("second event mapped");
    tokio::task::yield_now().await;
    assert!(
        !handle.is_finished(),
        "pump must be blocked on bounded send"
    );

    let mut texts = Vec::<String>::new();
    while let Some(ev) = rx.recv().await {
        if ev.kind == AgentWrapperEventKind::TextOutput {
            if let Some(text) = ev.text {
                texts.push(text);
            }
        }
    }

    handle.await.expect("pump task completes");
    assert_eq!(texts, vec!["one".to_string(), "two".to_string()]);
}

#[tokio::test]
async fn pump_stops_forwarding_after_receiver_drop_but_drains_to_end() {
    let total = 20usize;
    let consumed = std::sync::Arc::new(AtomicUsize::new(0));
    let items: VecDeque<Result<ToyEvent, ToyBackendError>> = (0..total)
        .map(|i| Ok::<ToyEvent, ToyBackendError>(ToyEvent::Text(format!("ev-{i}"))))
        .collect();

    let events = CountingStream {
        items,
        consumed: consumed.clone(),
    };
    let events: DynBackendEventStream<_, _> = Box::pin(events);

    let adapter = std::sync::Arc::new(ToyAdapter { fail_spawn: false });
    let (tx, mut rx) = mpsc::channel::<crate::AgentWrapperEvent>(1);
    let handle = tokio::spawn(pump_backend_events(adapter, events, tx, None));

    let first = rx.recv().await.expect("at least one forwarded event");
    assert_eq!(first.kind, AgentWrapperEventKind::TextOutput);
    drop(rx);

    handle.await.expect("pump task completes");
    assert_eq!(
        consumed.load(Ordering::SeqCst),
        total,
        "backend stream must be fully drained after receiver drop"
    );
}

#[tokio::test]
async fn pump_enforces_bounds_before_forwarding() {
    struct BoundsAdapter;

    impl BackendHarnessAdapter for BoundsAdapter {
        fn kind(&self) -> crate::AgentWrapperKind {
            toy_kind()
        }

        fn supported_extension_keys(&self) -> &'static [&'static str] {
            &[]
        }

        type Policy = ();

        fn validate_and_extract_policy(
            &self,
            _request: &crate::AgentWrapperRunRequest,
        ) -> Result<Self::Policy, crate::AgentWrapperError> {
            Ok(())
        }

        type BackendEvent = ();
        type BackendCompletion = ();
        type BackendError = ();

        fn spawn(
            &self,
            _req: contract::NormalizedRequest<Self::Policy>,
        ) -> Pin<
            Box<
                dyn Future<
                        Output = Result<
                            contract::BackendSpawn<
                                Self::BackendEvent,
                                Self::BackendCompletion,
                                Self::BackendError,
                            >,
                            Self::BackendError,
                        >,
                    > + Send
                    + 'static,
            >,
        > {
            panic!("spawn unused in pump tests");
        }

        fn map_event(&self, _event: Self::BackendEvent) -> Vec<crate::AgentWrapperEvent> {
            vec![crate::AgentWrapperEvent {
                agent_kind: toy_kind(),
                kind: AgentWrapperEventKind::Error,
                channel: Some("error".to_string()),
                text: None,
                message: Some("a".repeat(crate::bounds::MESSAGE_BOUND_BYTES + 10)),
                data: None,
            }]
        }

        fn map_completion(
            &self,
            _completion: Self::BackendCompletion,
        ) -> Result<crate::AgentWrapperCompletion, crate::AgentWrapperError> {
            panic!("map_completion unused in pump tests");
        }

        fn redact_error(
            &self,
            _phase: BackendHarnessErrorPhase,
            _err: &Self::BackendError,
        ) -> String {
            "unused".to_string()
        }
    }

    let events = futures_util::stream::iter([Ok::<(), ()>(())]);
    let events: DynBackendEventStream<_, _> = Box::pin(events);

    let adapter = std::sync::Arc::new(BoundsAdapter);
    let (tx, mut rx) = mpsc::channel::<crate::AgentWrapperEvent>(8);
    let handle = tokio::spawn(pump_backend_events(adapter, events, tx, None));

    let ev = rx.recv().await.expect("one event forwarded");
    let message = ev.message.as_deref().expect("message present");
    assert!(message.len() <= crate::bounds::MESSAGE_BOUND_BYTES);
    assert!(message.ends_with("…(truncated)"));

    while rx.recv().await.is_some() {}
    handle.await.expect("pump task completes");
}

#[tokio::test]
async fn pump_finality_sender_dropped_only_after_backend_stream_ends() {
    let (finish_tx, finish_rx) = oneshot::channel::<()>();
    let events = ControlledEndStream::<ToyEvent, ToyBackendError> {
        first: Some(Ok::<ToyEvent, ToyBackendError>(ToyEvent::Text(
            "hello".to_string(),
        ))),
        finish_rx,
    };
    let events: DynBackendEventStream<_, _> = Box::pin(events);

    let adapter = std::sync::Arc::new(ToyAdapter { fail_spawn: false });
    let (tx, mut rx) = mpsc::channel::<crate::AgentWrapperEvent>(8);
    let handle = tokio::spawn(pump_backend_events(adapter, events, tx, None));

    let ev = rx.recv().await.expect("first event forwarded");
    assert_eq!(ev.kind, AgentWrapperEventKind::TextOutput);

    tokio::task::yield_now().await;
    assert!(
        matches!(
            rx.try_recv(),
            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
        ),
        "events stream must not be final before backend stream ends"
    );
    assert!(
        !handle.is_finished(),
        "pump must not finish before stream end"
    );

    let _ = finish_tx.send(());
    handle.await.expect("pump task completes");
    assert!(
        rx.recv().await.is_none(),
        "events stream must be final after backend stream ends"
    );
}