use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use tinyagents::CancellationToken;
use tinyagents::TinyAgentsError;
use tinyagents::harness::context::{RunConfig, RunContext};
use tinyagents::harness::message::Message;
use tinyagents::harness::middleware::Middleware;
use tinyagents::harness::model::ModelDelta;
use tinyagents::harness::runtime::AgentHarness;
use tinyagents::harness::testkit::{EventRecorder, StreamingMock};
struct DeltaRecorder {
texts: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl Middleware<(), ()> for DeltaRecorder {
fn name(&self) -> &str {
"delta-recorder"
}
async fn on_model_delta(
&self,
_ctx: &mut RunContext<()>,
_state: &(),
delta: &mut ModelDelta,
) -> tinyagents::Result<()> {
self.texts.lock().unwrap().push(delta.content.clone());
Ok(())
}
}
#[tokio::test]
async fn streaming_invoke_fires_on_model_delta_and_accumulates_final_response() {
let texts = Arc::new(Mutex::new(Vec::new()));
let mut harness: AgentHarness<()> = AgentHarness::new();
harness.register_model(
"stream",
Arc::new(StreamingMock::from_text_chunks(["Hel", "lo, ", "world"])),
);
harness.push_middleware(Arc::new(DeltaRecorder {
texts: texts.clone(),
}));
let recorder = EventRecorder::new();
let ctx = RunContext::new(RunConfig::new("stream-run"), ()).with_events(recorder.sink());
let run = harness
.invoke_streaming_in_context(&(), ctx, vec![Message::user("hi")])
.await
.expect("streaming run succeeds");
assert_eq!(run.model_calls, 1);
assert_eq!(run.text(), Some("Hello, world".to_string()));
assert_eq!(
*texts.lock().unwrap(),
vec!["Hel".to_string(), "lo, ".to_string(), "world".to_string()]
);
let delta_events = recorder
.kinds()
.into_iter()
.filter(|k| k == "model.delta")
.count();
assert_eq!(delta_events, 3, "one model.delta event per streamed delta");
}
#[tokio::test]
async fn pre_cancelled_token_yields_cancelled_on_streaming_path() {
let mut harness: AgentHarness<()> = AgentHarness::new();
harness.register_model(
"stream",
Arc::new(StreamingMock::from_text_chunks(["never", "delivered"])),
);
let token = CancellationToken::new();
token.cancel();
let ctx = RunContext::new(RunConfig::new("cancel-stream"), ()).with_cancellation(token);
let err = harness
.invoke_streaming_in_context(&(), ctx, vec![Message::user("hi")])
.await
.expect_err("a pre-cancelled streaming run must not complete");
assert!(matches!(err, TinyAgentsError::Cancelled), "got {err:?}");
}