Skip to main content

bamboo_engine/runtime/stream/
handler.rs

1use std::time::Duration;
2
3use tokio::sync::mpsc;
4use tokio_util::sync::CancellationToken;
5
6use bamboo_agent_core::tools::ToolCall;
7use bamboo_agent_core::{AgentError, AgentEvent};
8use bamboo_llm::LLMStream;
9
10mod chunk_handling;
11mod consume;
12mod stream_state;
13
14/// Default *inter-chunk* idle timeout for LLM streams (issue #28).
15///
16/// A provider connection that stops sending chunks for longer than this is
17/// treated as a hung stream. Because the deadline resets on every received
18/// chunk (see [`consume::consume_llm_stream_internal`]), a legitimately long
19/// stream that keeps producing data — e.g. a thinking model that streams for
20/// minutes — is never affected; only a true stall (no chunk at all) trips it.
21///
22/// This is a well-named const default rather than a threaded config field: the
23/// consume function is shared across the main stream path and several
24/// auxiliary (silent) callers, and wiring a new field through `AgentLoopConfig`
25/// plus every call site is invasive for a streaming hot path. The internal
26/// function still accepts an `idle_timeout` override so it remains configurable
27/// and testable; full config wiring is deferred.
28const DEFAULT_STREAM_IDLE_TIMEOUT: Duration = Duration::from_secs(120);
29
30pub struct StreamHandlingOutput {
31    pub response_id: Option<String>,
32    pub content: String,
33    pub reasoning_content: String,
34    pub token_count: usize,
35    pub tool_calls: Vec<ToolCall>,
36    pub output_tokens: u64,
37    pub thinking_tokens: u64,
38    pub cache_creation_input_tokens: u64,
39    pub cache_read_input_tokens: u64,
40    pub input_tokens: u64,
41}
42
43pub async fn consume_llm_stream(
44    stream: LLMStream,
45    event_tx: &mpsc::Sender<AgentEvent>,
46    cancel_token: &CancellationToken,
47    session_id: &str,
48) -> Result<StreamHandlingOutput, AgentError> {
49    consume::consume_llm_stream_internal(
50        stream,
51        Some(event_tx),
52        cancel_token,
53        session_id,
54        DEFAULT_STREAM_IDLE_TIMEOUT,
55    )
56    .await
57}
58
59pub async fn consume_llm_stream_silent(
60    stream: LLMStream,
61    cancel_token: &CancellationToken,
62    session_id: &str,
63) -> Result<StreamHandlingOutput, AgentError> {
64    consume::consume_llm_stream_internal(
65        stream,
66        None,
67        cancel_token,
68        session_id,
69        DEFAULT_STREAM_IDLE_TIMEOUT,
70    )
71    .await
72}
73
74#[cfg(test)]
75mod tests;