1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use tokio::sync::mpsc;
use bamboo_agent_core::{AgentError, AgentEvent};
use bamboo_llm::{provider, LLMChunk};
use super::stream_state::StreamAccumulationState;
pub(super) async fn handle_chunk_result(
chunk_result: provider::Result<LLMChunk>,
state: &mut StreamAccumulationState,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
session_id: &str,
) -> Result<(), AgentError> {
match chunk_result {
Ok(LLMChunk::ResponseId(response_id)) => {
tracing::debug!(
"[{}] Received upstream response_id={}",
session_id,
response_id
);
state.set_response_id(response_id);
Ok(())
}
Ok(LLMChunk::Token(token)) => {
state.append_token(&token);
if let Some(event_tx) = event_tx {
// `send().await` applies proper backpressure: it only yields
// (waiting for capacity) while a subscriber is present, and
// returns `Err` solely when the receiver has been dropped
// (subscriber disconnected). A `try_send` or a bounded-timeout
// send would *drop* the token under load — exactly the silent
// loss this guards against — so the await is preserved. The
// failure path (receiver gone) used to be swallowed by
// `let _ =`; it now logs once per token only *after* the
// subscriber is already gone (issue #23).
if event_tx
.send(AgentEvent::Token { content: token })
.await
.is_err()
{
tracing::warn!(
"[{}] event channel closed; dropping streamed token \
(subscriber disconnected)",
session_id,
);
}
}
Ok(())
}
Ok(LLMChunk::ReasoningToken(token)) => {
state.append_reasoning_token(&token);
if let Some(event_tx) = event_tx {
if event_tx
.send(AgentEvent::ReasoningToken { content: token })
.await
.is_err()
{
tracing::warn!(
"[{}] event channel closed; dropping streamed reasoning token \
(subscriber disconnected)",
session_id,
);
}
}
Ok(())
}
Ok(LLMChunk::ToolCalls(partial_calls)) => {
tracing::trace!(
"[{}] Received {} tool call parts",
session_id,
partial_calls.len()
);
state.extend_tool_calls(partial_calls);
Ok(())
}
Ok(LLMChunk::Done) => {
tracing::debug!("[{}] LLM stream completed", session_id);
Ok(())
}
Ok(LLMChunk::CacheUsage {
cache_creation_input_tokens,
cache_read_input_tokens,
input_tokens,
}) => {
tracing::debug!(
"[{}] Cache usage: creation={}, read={}, input={}",
session_id,
cache_creation_input_tokens,
cache_read_input_tokens,
input_tokens
);
state.record_cache(
cache_creation_input_tokens,
cache_read_input_tokens,
input_tokens,
);
Ok(())
}
Ok(LLMChunk::UsageSummary {
output_tokens,
thinking_tokens,
}) => {
tracing::debug!(
"[{}] Usage summary: output={}, thinking={}",
session_id,
output_tokens,
thinking_tokens
);
state.record_usage(output_tokens, thinking_tokens);
Ok(())
}
Err(error) => {
let message = error.to_string();
tracing::warn!("[{}] LLM stream error: {}", session_id, message);
// Do not emit AgentEvent::Error here.
// Round-level retry logic may recover from this transient stream failure.
let _ = event_tx;
Err(AgentError::LLM(message))
}
}
}