use std::time::Duration;
use futures::{stream, StreamExt};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tokio_util::sync::CancellationToken;
use bamboo_agent_core::tools::{FunctionCall, ToolCall};
use bamboo_agent_core::{AgentError, AgentEvent};
use bamboo_llm::provider::LLMError;
use bamboo_llm::{LLMChunk, LLMStream};
use super::consume::consume_llm_stream_internal;
use super::{consume_llm_stream, consume_llm_stream_silent};
fn build_stream(items: Vec<bamboo_llm::provider::Result<LLMChunk>>) -> LLMStream {
Box::pin(stream::iter(items))
}
#[tokio::test]
async fn consume_llm_stream_accumulates_tokens_and_tool_calls() {
let stream = build_stream(vec![
Ok(LLMChunk::ResponseId("resp_123".to_string())),
Ok(LLMChunk::ReasoningToken("thinking".to_string())),
Ok(LLMChunk::Token("hi".to_string())),
Ok(LLMChunk::ToolCalls(vec![ToolCall {
id: "call_1".to_string(),
tool_type: "function".to_string(),
function: FunctionCall {
name: "test_tool".to_string(),
arguments: "{".to_string(),
},
}])),
Ok(LLMChunk::ToolCalls(vec![ToolCall {
id: "call_1".to_string(),
tool_type: "function".to_string(),
function: FunctionCall {
name: String::new(),
arguments: "}".to_string(),
},
}])),
Ok(LLMChunk::Done),
]);
let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(8);
let output = consume_llm_stream(stream, &event_tx, &CancellationToken::new(), "session-1")
.await
.expect("stream should succeed");
assert_eq!(output.response_id.as_deref(), Some("resp_123"));
assert_eq!(output.content, "hi");
assert_eq!(output.reasoning_content, "thinking");
assert_eq!(output.token_count, 2);
assert_eq!(output.tool_calls.len(), 1);
assert_eq!(output.tool_calls[0].function.name, "test_tool");
assert_eq!(output.tool_calls[0].function.arguments, "{}");
let reasoning_event = event_rx.recv().await.expect("missing reasoning event");
assert!(matches!(reasoning_event, AgentEvent::ReasoningToken { .. }));
let token_event = event_rx.recv().await.expect("missing token event");
assert!(matches!(token_event, AgentEvent::Token { .. }));
}
#[tokio::test]
async fn consume_llm_stream_silent_does_not_emit_events() {
let stream = build_stream(vec![
Ok(LLMChunk::Token("hello".to_string())),
Ok(LLMChunk::Done),
]);
let output = consume_llm_stream_silent(stream, &CancellationToken::new(), "session-2")
.await
.expect("silent stream should succeed");
assert!(output.response_id.is_none());
assert_eq!(output.content, "hello");
assert!(output.reasoning_content.is_empty());
assert_eq!(output.token_count, 5);
assert!(output.tool_calls.is_empty());
}
#[tokio::test]
async fn consume_llm_stream_returns_single_prefix_stream_error_message() {
let stream = build_stream(vec![Err(LLMError::Stream(
"Transport error: error decoding response body".to_string(),
))]);
let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(4);
let err =
match consume_llm_stream(stream, &event_tx, &CancellationToken::new(), "session-3").await {
Ok(_) => panic!("stream should fail"),
Err(err) => err,
};
match err {
AgentError::LLM(message) => {
assert_eq!(
message,
"Stream error: Transport error: error decoding response body"
);
assert!(!message.starts_with("Stream error: Stream error:"));
}
other => panic!("expected AgentError::LLM, got {other:?}"),
}
assert!(matches!(event_rx.try_recv(), Err(TryRecvError::Empty)));
}
#[tokio::test]
async fn consume_llm_stream_aborts_already_cancelled_stalled_stream() {
let stream: LLMStream = Box::pin(stream::pending());
let cancel = CancellationToken::new();
cancel.cancel();
let result = tokio::time::timeout(
std::time::Duration::from_secs(5),
consume_llm_stream_silent(stream, &cancel, "session-cancelled"),
)
.await
.expect("must not hang: cancellation should interrupt the stalled stream");
assert!(matches!(result, Err(AgentError::Cancelled)));
}
#[tokio::test]
async fn consume_llm_stream_interrupts_blocked_next_on_mid_stream_cancel() {
let stream: LLMStream = Box::pin(stream::pending());
let cancel = CancellationToken::new();
let canceller = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
canceller.cancel();
});
let result = tokio::time::timeout(
std::time::Duration::from_secs(5),
consume_llm_stream_silent(stream, &cancel, "session-cancel-mid"),
)
.await
.expect("must not hang: mid-stream cancellation should interrupt the blocked next()");
assert!(matches!(result, Err(AgentError::Cancelled)));
}
#[tokio::test]
async fn consume_llm_stream_times_out_when_stream_stalls_after_first_chunk() {
let stream: LLMStream = Box::pin(
stream::once(async { Ok::<_, LLMError>(LLMChunk::Token("first".to_string())) })
.chain(stream::pending()),
);
let idle_timeout = Duration::from_millis(80);
let started = std::time::Instant::now();
let result = tokio::time::timeout(
Duration::from_secs(2),
consume_llm_stream_internal(
stream,
None,
&CancellationToken::new(),
"session-timeout",
idle_timeout,
),
)
.await
.expect("idle timeout must interrupt the stalled stream, not hang");
let elapsed = started.elapsed();
match result {
Err(AgentError::StreamTimeout(_)) => {}
Err(other) => panic!("expected AgentError::StreamTimeout, got Err({other:?})"),
Ok(_) => panic!("expected AgentError::StreamTimeout, got Ok(_)"),
}
assert!(
elapsed >= Duration::from_millis(50),
"timeout fired too early ({elapsed:?}): the first chunk should reset the idle timer"
);
assert!(
elapsed < Duration::from_millis(800),
"timeout fired too late ({elapsed:?}): the stalled stream was not interrupted in time"
);
}
#[tokio::test]
async fn consume_llm_stream_idle_timeout_does_not_affect_healthy_stream() {
let idle_timeout = Duration::from_millis(80);
let per_chunk_delay = Duration::from_millis(25);
let chunk_count: u32 = 6;
let stream: LLMStream = Box::pin(stream::unfold(0u32, move |i| async move {
if i >= chunk_count {
return None;
}
tokio::time::sleep(per_chunk_delay).await;
Some((
Ok::<_, LLMError>(LLMChunk::Token(format!("chunk-{i}"))),
i + 1,
))
}));
let output = tokio::time::timeout(
Duration::from_secs(3),
consume_llm_stream_internal(
stream,
None,
&CancellationToken::new(),
"session-healthy",
idle_timeout,
),
)
.await
.expect("healthy stream must complete without timing out")
.expect("stream should succeed");
assert_eq!(output.content, "chunk-0chunk-1chunk-2chunk-3chunk-4chunk-5");
assert_eq!(output.token_count, output.content.chars().count());
}
#[tokio::test]
async fn consume_llm_stream_continues_when_subscriber_disconnects() {
let stream = build_stream(vec![
Ok(LLMChunk::ReasoningToken("think".to_string())),
Ok(LLMChunk::Token("hello".to_string())),
Ok(LLMChunk::Token("world".to_string())),
Ok(LLMChunk::Done),
]);
let (event_tx, event_rx) = mpsc::channel::<AgentEvent>(1);
drop(event_rx);
let output = tokio::time::timeout(
Duration::from_secs(5),
consume_llm_stream(
stream,
&event_tx,
&CancellationToken::new(),
"session-disconnect",
),
)
.await
.expect("stream must complete when the subscriber is disconnected, not hang")
.expect("stream should succeed even though event sends fail");
assert_eq!(output.reasoning_content, "think");
assert_eq!(output.content, "helloworld");
assert_eq!(output.token_count, 10);
}