use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tokio_util::sync::CancellationToken;
use serde_json::{json, Value};
use reqwest::Client;
use futures::StreamExt;
use crate::{Result, RuntimeError, ToolRegistry};
use crate::runtime::telemetry::{self, TelemetryLevel};
use super::sse_types::{AnthropicEvent, ContentBlock, Delta};
use super::types::{AuthState, StreamEvent, LlmEvent, SessionEvent};
use super::helpers::HelperMethods;
fn truncate_at_char_boundary(s: &str, max: usize) -> &str {
if s.len() <= max {
return s;
}
let mut end = max;
while !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
fn parse_tool_input(raw: &str) -> Value {
if raw.trim().is_empty() {
return json!({});
}
match serde_json::from_str(raw) {
Ok(v) => v,
Err(e) => json!({ "__parse_error": format!("invalid tool input JSON: {}", e) }),
}
}
struct ParseState {
accumulated_content: Vec<Value>,
current_text: String,
current_tool_name: String,
current_tool_id: String,
current_tool_input_json: String,
in_tool_use: bool,
current_thinking: String,
current_thinking_signature: String,
in_thinking: bool,
telem_msg_id: Option<String>,
telem_ttft: Option<u64>,
telem_stop_reason: Option<String>,
telem_usage: telemetry::UsageRecord,
first_event_seen: bool,
msg_start_input: Option<u64>,
msg_start_output: Option<u64>,
msg_start_cache_read: Option<u64>,
msg_start_cache_create: Option<u64>,
msg_start_cache_5m: Option<u64>,
msg_start_cache_1h: Option<u64>,
usage_emitted: bool,
stream_error: Option<StreamError>,
}
#[derive(Debug, Clone)]
struct StreamError {
message: String,
retryable: bool,
}
impl StreamError {
fn is_retryable_type(error_type: &str) -> bool {
matches!(
error_type,
"overloaded_error" | "api_error" | "rate_limit_error"
)
}
}
#[derive(Debug)]
enum StreamOutcome {
Done(Value),
Retry(String),
Fail(String),
}
impl ParseState {
fn new() -> Self {
Self {
accumulated_content: Vec::new(),
current_text: String::new(),
current_tool_name: String::new(),
current_tool_id: String::new(),
current_tool_input_json: String::new(),
in_tool_use: false,
current_thinking: String::new(),
current_thinking_signature: String::new(),
in_thinking: false,
telem_msg_id: None,
telem_ttft: None,
telem_stop_reason: None,
telem_usage: telemetry::UsageRecord::default(),
first_event_seen: false,
msg_start_input: None,
msg_start_output: None,
msg_start_cache_read: None,
msg_start_cache_create: None,
msg_start_cache_5m: None,
msg_start_cache_1h: None,
usage_emitted: false,
stream_error: None,
}
}
fn finalize(&mut self) {
if self.in_thinking {
if !self.current_thinking.is_empty() {
self.accumulated_content.push(json!({
"type": "thinking",
"thinking": self.current_thinking,
"signature": self.current_thinking_signature
}));
}
self.in_thinking = false;
} else if self.in_tool_use {
let input = parse_tool_input(&self.current_tool_input_json);
self.accumulated_content.push(json!({
"type": "tool_use",
"id": self.current_tool_id,
"name": self.current_tool_name,
"input": input
}));
self.in_tool_use = false;
} else if !self.current_text.is_empty() {
self.accumulated_content.push(json!({
"type": "text",
"text": self.current_text
}));
}
self.current_text.clear();
}
}
struct EventCtx<'t> {
tx: &'t mpsc::UnboundedSender<StreamEvent>,
telemetry_level: TelemetryLevel,
request_start: std::time::Instant,
cache_ttl: crate::core::config::CacheTtl,
ttl_downgrade_notified: std::sync::Arc<std::sync::atomic::AtomicBool>,
saw_1h_honored: std::sync::Arc<std::sync::atomic::AtomicBool>,
request_has_1h_marker: bool,
}
fn process_data_line(line: &str, state: &mut ParseState, ctx: &EventCtx) {
let Some(data_part) = line.strip_prefix("data: ") else {
return;
};
if data_part.trim() == "[DONE]" {
return;
}
let event = match serde_json::from_str::<AnthropicEvent>(data_part) {
Ok(e) => e,
Err(_) => return, };
process_event(event, data_part, state, ctx);
}
fn process_event(event: AnthropicEvent<'_>, raw: &str, state: &mut ParseState, ctx: &EventCtx) {
if !state.first_event_seen && ctx.telemetry_level.enabled() {
state.telem_ttft = Some(ctx.request_start.elapsed().as_millis() as u64);
state.first_event_seen = true;
}
match event {
AnthropicEvent::ContentBlockStart { content_block } => match content_block {
ContentBlock::Thinking => {
state.current_thinking.clear();
state.current_thinking_signature.clear();
state.in_thinking = true;
}
ContentBlock::ToolUse { id, name } => {
state.current_tool_name = name.into_owned();
state.current_tool_id = id.into_owned();
state.current_tool_input_json.clear();
state.in_tool_use = true;
let _ = ctx.tx.send(StreamEvent::Llm(LlmEvent::ToolUseStart {
tool_name: state.current_tool_name.clone(),
tool_id: state.current_tool_id.clone(),
}));
}
ContentBlock::Text => {
if !state.current_text.is_empty() {
state.accumulated_content.push(json!({
"type": "text",
"text": state.current_text
}));
state.current_text.clear();
}
}
ContentBlock::Unknown => {}
},
AnthropicEvent::ContentBlockDelta { delta } => match delta {
Delta::TextDelta { text } => {
state.current_text.push_str(&text);
let _ = ctx.tx.send(StreamEvent::Llm(LlmEvent::Text(text.into_owned())));
}
Delta::ThinkingDelta { thinking } => {
state.current_thinking.push_str(&thinking);
let _ = ctx.tx.send(StreamEvent::Llm(LlmEvent::Thinking(thinking.into_owned())));
}
Delta::SignatureDelta { signature } => {
state.current_thinking_signature = signature.into_owned();
}
Delta::InputJsonDelta { partial_json } => {
state.current_tool_input_json.push_str(&partial_json);
let _ = ctx.tx.send(StreamEvent::Llm(LlmEvent::ToolUseDelta {
tool_id: state.current_tool_id.clone(),
delta: partial_json.into_owned(),
}));
}
Delta::Unknown => {}
},
AnthropicEvent::ContentBlockStop => {
if state.in_thinking {
if !state.current_thinking.is_empty() {
state.accumulated_content.push(json!({
"type": "thinking",
"thinking": state.current_thinking,
"signature": state.current_thinking_signature
}));
}
state.in_thinking = false;
} else if state.in_tool_use {
let input = parse_tool_input(&state.current_tool_input_json);
state.accumulated_content.push(json!({
"type": "tool_use",
"id": state.current_tool_id,
"name": state.current_tool_name,
"input": input
}));
let _ = ctx.tx.send(StreamEvent::Llm(LlmEvent::ToolUse {
tool_name: state.current_tool_name.clone(),
tool_id: state.current_tool_id.clone(),
input: input.clone(),
}));
state.in_tool_use = false;
} else if !state.current_text.is_empty() {
state.accumulated_content.push(json!({
"type": "text",
"text": state.current_text
}));
state.current_text.clear();
}
}
AnthropicEvent::MessageDelta { delta, usage } => {
if ctx.telemetry_level.enabled() {
if let Some(sr) = delta.and_then(|d| d.stop_reason) {
state.telem_stop_reason = Some(sr.into_owned());
}
}
if let Some(usage) = usage {
let input_t = if usage.input_tokens > 0 { usage.input_tokens } else { state.msg_start_input.unwrap_or(0) };
let output_t = if usage.output_tokens > 0 { usage.output_tokens } else { state.msg_start_output.unwrap_or(0) };
let cache_read = if usage.cache_read_input_tokens > 0 { usage.cache_read_input_tokens } else { state.msg_start_cache_read.unwrap_or(0) };
let cache_create = if usage.cache_creation_input_tokens > 0 { usage.cache_creation_input_tokens } else { state.msg_start_cache_create.unwrap_or(0) };
let cache_create_5m = usage.cache_creation.as_ref()
.and_then(|cc| cc.ephemeral_5m_input_tokens)
.or(state.msg_start_cache_5m);
let cache_create_1h = usage.cache_creation.as_ref()
.and_then(|cc| cc.ephemeral_1h_input_tokens)
.or(state.msg_start_cache_1h);
if cache_create_1h.unwrap_or(0) > 0 {
ctx.saw_1h_honored.store(true, std::sync::atomic::Ordering::Relaxed);
}
if ctx.cache_ttl != crate::core::config::CacheTtl::FiveMinutes
&& ctx.request_has_1h_marker
&& cache_create_1h.unwrap_or(0) == 0
&& cache_create_5m.unwrap_or(0) > 0
&& cache_read == 0
&& !ctx.saw_1h_honored.load(std::sync::atomic::Ordering::Relaxed)
&& !ctx.ttl_downgrade_notified.swap(true, std::sync::atomic::Ordering::Relaxed)
{
tracing::warn!("1h cache TTL not honored — check account/beta support (cache_ttl config)");
let _ = ctx.tx.send(StreamEvent::Session(SessionEvent::Notice(
"⚠ 1h cache TTL not honored — check account/beta support (cache_ttl config)".to_string(),
)));
}
if input_t > 0 || output_t > 0 || cache_read > 0 || cache_create > 0 {
HelperMethods::log_usage(input_t, cache_read, cache_create, output_t);
tracing::debug!("Token Usage: {} input | {} output | {} cache_read | {} cache_create", input_t, output_t, cache_read, cache_create);
if ctx.telemetry_level.enabled() {
state.telem_usage.input = input_t;
state.telem_usage.output = output_t;
state.telem_usage.cache_read = cache_read;
state.telem_usage.cache_write = cache_create;
state.telem_usage.cache_write_5m = cache_create_5m;
state.telem_usage.cache_write_1h = cache_create_1h;
state.telem_usage.compute_hit_pct();
}
state.usage_emitted = true;
let _ = ctx.tx.send(StreamEvent::Session(SessionEvent::Usage {
input_tokens: input_t,
output_tokens: output_t,
cache_read_input_tokens: cache_read,
cache_creation_input_tokens: cache_create,
cache_creation_5m: cache_create_5m,
cache_creation_1h: cache_create_1h,
model: None,
}));
}
}
}
AnthropicEvent::MessageStart { message } => {
if ctx.telemetry_level.enabled() {
if let Some(id) = message.id {
state.telem_msg_id = Some(id.into_owned());
}
}
if let Some(usage) = message.usage {
state.msg_start_input = Some(usage.input_tokens);
state.msg_start_output = Some(usage.output_tokens);
state.msg_start_cache_read = Some(usage.cache_read_input_tokens);
state.msg_start_cache_create = Some(usage.cache_creation_input_tokens);
state.msg_start_cache_5m = usage.cache_creation.as_ref().and_then(|cc| cc.ephemeral_5m_input_tokens);
state.msg_start_cache_1h = usage.cache_creation.as_ref().and_then(|cc| cc.ephemeral_1h_input_tokens);
if state.msg_start_cache_1h.unwrap_or(0) > 0 {
ctx.saw_1h_honored.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
}
AnthropicEvent::MessageStop => {}
AnthropicEvent::Error { error } => {
if state.stream_error.is_none() {
let (kind, body) = match &error {
Some(e) => (
e.error_type.as_deref().unwrap_or("error"),
e.message.as_deref(),
),
None => ("error", None),
};
let retryable = StreamError::is_retryable_type(kind);
let message = match body {
Some(m) => format!("API stream error ({kind}): {m}"),
None => format!("API stream error ({kind})"),
};
tracing::warn!(
retryable,
raw = %truncate_at_char_boundary(raw, 400),
"SSE error event: {message}"
);
state.stream_error = Some(StreamError { message, retryable });
}
}
AnthropicEvent::Unknown => {
tracing::trace!(
"Unknown SSE event type: {}",
truncate_at_char_boundary(raw, 200)
);
}
}
}
fn emit_residual_usage(state: &mut ParseState, ctx: &EventCtx) {
if state.usage_emitted {
return;
}
let input_t = state.msg_start_input.unwrap_or(0);
let output_t = state.msg_start_output.unwrap_or(0);
let cache_read = state.msg_start_cache_read.unwrap_or(0);
let cache_create = state.msg_start_cache_create.unwrap_or(0);
if input_t == 0 && output_t == 0 && cache_read == 0 && cache_create == 0 {
return; }
HelperMethods::log_usage(input_t, cache_read, cache_create, output_t);
tracing::debug!("Token Usage (residual, dead stream): {} input | {} output | {} cache_read | {} cache_create", input_t, output_t, cache_read, cache_create);
if ctx.telemetry_level.enabled() {
state.telem_usage.input = input_t;
state.telem_usage.output = output_t;
state.telem_usage.cache_read = cache_read;
state.telem_usage.cache_write = cache_create;
state.telem_usage.cache_write_5m = state.msg_start_cache_5m;
state.telem_usage.cache_write_1h = state.msg_start_cache_1h;
state.telem_usage.compute_hit_pct();
}
state.usage_emitted = true;
let _ = ctx.tx.send(StreamEvent::Session(SessionEvent::Usage {
input_tokens: input_t,
output_tokens: output_t,
cache_read_input_tokens: cache_read,
cache_creation_input_tokens: cache_create,
cache_creation_5m: state.msg_start_cache_5m,
cache_creation_1h: state.msg_start_cache_1h,
model: None,
}));
}
fn classify_stream_outcome(
stream_error: Option<StreamError>,
content: Vec<Value>,
has_stop_reason: bool,
cancelled: bool,
) -> StreamOutcome {
if let Some(e) = stream_error {
if cancelled {
return StreamOutcome::Done(json!({ "content": content }));
}
return if e.retryable {
StreamOutcome::Retry(e.message)
} else {
StreamOutcome::Fail(e.message)
};
}
if !cancelled && content.is_empty() && !has_stop_reason {
return StreamOutcome::Fail(
"empty response from API — the model returned no content and no stop \
reason. This usually means the context window was exceeded or the \
API was overloaded. Try /compact or start a fresh session."
.to_string(),
);
}
StreamOutcome::Done(json!({ "content": content }))
}
#[derive(Debug, Clone, Default)]
pub struct ApiOptions {
pub use_1m_context: bool,
pub cache_ttl: crate::core::config::CacheTtl,
pub ttl_downgrade_notified: std::sync::Arc<std::sync::atomic::AtomicBool>,
pub saw_1h_honored: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
pub(super) struct ApiMethods;
impl ApiMethods {
#[allow(dead_code, clippy::too_many_arguments)]
pub(super) async fn call_api_stream(
auth: &Arc<RwLock<AuthState>>,
client: &Client,
model: &str,
tools: &ToolRegistry,
system_prompt: &Option<String>,
thinking_budget: u32,
messages: &[Value],
tx: mpsc::UnboundedSender<StreamEvent>,
max_retries: u32,
options: &ApiOptions,
telemetry_level: crate::runtime::telemetry::TelemetryLevel,
) -> Result<Value> {
Self::call_api_stream_inner(auth, client, model, tools, system_prompt, thinking_budget, messages, tx, &CancellationToken::new(), max_retries, options, telemetry_level).await
}
#[allow(clippy::too_many_arguments)]
#[allow(clippy::collapsible_match)]
pub(super) async fn call_api_stream_inner(
auth: &Arc<RwLock<AuthState>>,
client: &Client,
model: &str,
tools: &ToolRegistry,
system_prompt: &Option<String>,
thinking_budget: u32,
messages: &[Value],
tx: mpsc::UnboundedSender<StreamEvent>,
cancel: &CancellationToken,
max_retries: u32,
options: &ApiOptions,
telemetry_level: crate::runtime::telemetry::TelemetryLevel,
) -> Result<Value> {
let tools_schema = tools.tools_schema();
if let Some(result) = crate::runtime::openai::try_route(
model, client, &tools_schema, system_prompt, messages, &tx,
None, None, thinking_budget, cancel,
).await {
return result.map_err(|e| RuntimeError::Config(format!("openai provider: {e}")));
}
let (auth_header_name, auth_header_value, auth_type) = Self::build_auth_header(auth).await;
if auth_type == "none" {
return Err(RuntimeError::Auth(
"No Anthropic credentials. Run `synaps login` or set ANTHROPIC_API_KEY, or switch to a provider model with `/model groq/llama-3.3-70b-versatile`.".to_string()
));
}
tracing::info!(model = %model, "Starting API request");
let mut cleaned_messages = messages.to_vec();
HelperMethods::sanitize_thinking_blocks(&mut cleaned_messages);
HelperMethods::annotate_cache_breakpoint(&mut cleaned_messages, options.cache_ttl);
let thinking_level = crate::core::models::thinking_level_for_budget(thinking_budget);
let mut body = json!({
"model": model,
"max_tokens": HelperMethods::max_tokens_for_model(model),
"messages": cleaned_messages,
"tools": &*tools_schema,
"stream": true,
"thinking": if crate::core::models::model_supports_adaptive_thinking(model) {
json!({ "type": "adaptive", "display": "summarized" })
} else {
let budget = if thinking_budget == 0 { crate::core::models::DEFAULT_LEGACY_ADAPTIVE_FALLBACK } else { thinking_budget };
json!({
"type": "enabled",
"budget_tokens": budget,
"display": "summarized"
})
}
});
if crate::core::models::model_supports_adaptive_thinking(model) {
if let Some(effort) = crate::core::models::effort_for_thinking_level(thinking_level) {
body["output_config"] = json!({"effort": effort});
}
}
HelperMethods::mark_last_tool(&mut body, options.cache_ttl);
if let Some(system) = HelperMethods::build_system_blocks(&auth_type, system_prompt, options.cache_ttl) {
body["system"] = system;
}
let has_tool_marker = body["tools"].as_array().is_some_and(|t| !t.is_empty());
let has_system_marker = body.get("system").is_some();
let request_has_1h_marker = match options.cache_ttl {
crate::core::config::CacheTtl::FiveMinutes => false,
crate::core::config::CacheTtl::OneHour => {
!cleaned_messages.is_empty() || has_tool_marker || has_system_marker
}
crate::core::config::CacheTtl::Hybrid => has_tool_marker || has_system_marker,
};
tracing::trace!("Outgoing API Request Payload:\n{}", serde_json::to_string_pretty(&body).unwrap_or_default());
let body_bytes: bytes::Bytes = serde_json::to_vec(&body)
.map_err(|e| RuntimeError::ApiStatus(format!("failed to serialize request body: {}", e)))?
.into();
const MAX_429_RETRIES: u32 = 8;
let mut last_err = String::new();
let mut last_status: Option<u16> = None;
let mut last_reset_hint: Option<String> = None;
let mut non_429_attempts: u32 = 0; let mut attempt: u32 = 0;
loop {
let response = {
#[allow(unused_assignments)]
let mut response = None;
loop {
if attempt > 0 {
}
let mut req = client
.post("https://api.anthropic.com/v1/messages")
.header(auth_header_name.clone(), auth_header_value.clone())
.header("anthropic-version", "2023-06-01")
.header("content-type", "application/json");
if let Some(beta) = Self::build_beta_header(&auth_type, options, model) {
req = req.header("anthropic-beta", beta);
}
match req.body(body_bytes.clone()).send().await {
Ok(resp) => {
let status = resp.status();
if status.is_success() {
response = Some(resp);
break;
}
let is_429 = status.as_u16() == 429;
let is_retryable = matches!(status.as_u16(), 429 | 500 | 502 | 503 | 529);
let (delay, from_hdr) = telemetry::retry_delay_from_headers(resp.headers(), attempt + 1);
let reset_hint = if from_hdr {
Some(format!("{}s", delay.as_secs()))
} else {
None
};
let error_text = resp.text().await.unwrap_or_default();
let retry_exhausted = if is_429 {
attempt >= MAX_429_RETRIES
} else {
non_429_attempts >= max_retries
};
if !is_retryable || retry_exhausted {
let hint = reset_hint.as_deref().or(last_reset_hint.as_deref());
return Err(RuntimeError::ApiStatus(
crate::core::error::humanize_api_error_with_reset(
status.as_u16(),
&error_text,
hint,
)
));
}
last_status = Some(status.as_u16());
last_reset_hint = reset_hint.clone();
last_err = format!("{}: {}", status, error_text);
if !is_429 {
non_429_attempts += 1;
}
let budget = if is_429 { MAX_429_RETRIES } else { max_retries };
let retry_num = if is_429 { attempt + 1 } else { non_429_attempts };
let notice = if is_429 {
if let Some(ref hint) = reset_hint {
format!("⚠ Rate limited — resuming in {} ({}/{})", hint, retry_num, budget)
} else {
format!("⚠ Rate limited — retrying ({}/{})", retry_num, budget)
}
} else {
format!("⏳ API error, retrying ({}/{})…", retry_num, budget)
};
tracing::warn!("API retry after {:?}: {} — {}", delay, notice, last_err);
let _ = tx.send(StreamEvent::Session(SessionEvent::Notice(notice)));
tokio::time::sleep(delay).await;
if cancel.is_cancelled() {
return Err(RuntimeError::Canceled);
}
}
Err(e) => {
non_429_attempts += 1;
if non_429_attempts > max_retries {
return Err(RuntimeError::ApiStatus(crate::core::error::humanize_network_error(&e)));
}
last_err = e.to_string();
last_status = None;
let delay = Duration::from_millis(1000 * 2u64.pow(non_429_attempts.saturating_sub(1)));
tracing::warn!("API retry {}/{} after {:?}: {}", non_429_attempts, max_retries, delay, last_err);
let _ = tx.send(StreamEvent::Session(SessionEvent::Notice(
format!("⏳ API error, retrying ({}/{})…", non_429_attempts, max_retries)
)));
tokio::time::sleep(delay).await;
if cancel.is_cancelled() {
return Err(RuntimeError::Canceled);
}
}
}
attempt += 1;
}
response.ok_or_else(|| {
let hint = last_reset_hint.as_deref();
let status = last_status.unwrap_or(0);
if status == 429 {
RuntimeError::ApiStatus(
crate::core::error::humanize_api_error_with_reset(429, &last_err, hint)
)
} else {
RuntimeError::Tool(format!("API failed after retries: {}", last_err))
}
})?
};
let request_start = std::time::Instant::now();
let telem_request_id = if telemetry_level.enabled() {
telemetry::request_id_from_headers(response.headers())
} else {
None
};
let telem_ratelimit = if telemetry_level == TelemetryLevel::Full {
let rl = telemetry::ratelimit_from_headers(response.headers());
if rl.is_empty() { None } else { Some(rl) }
} else {
None
};
let mut stream = response.bytes_stream();
tracing::debug!("Stream opened");
let mut state = ParseState::new();
let ctx = EventCtx {
tx: &tx,
telemetry_level,
request_start,
cache_ttl: options.cache_ttl,
ttl_downgrade_notified: options.ttl_downgrade_notified.clone(),
saw_1h_honored: options.saw_1h_honored.clone(),
request_has_1h_marker,
};
let mut line_buffer = super::sse::SseLineBuffer::new();
while let Some(chunk) = stream.next().await {
if cancel.is_cancelled() {
break;
}
let chunk = match chunk {
Ok(c) => c,
Err(e) => {
emit_residual_usage(&mut state, &ctx);
return Err(RuntimeError::ApiStatus(crate::core::error::humanize_network_error(&e)));
}
};
line_buffer.extend(&chunk);
while let Some(line) = line_buffer.next_line() {
process_data_line(line, &mut state, &ctx);
}
}
let remaining = line_buffer.take_remaining().unwrap_or_default();
process_data_line(&remaining, &mut state, &ctx);
state.finalize();
emit_residual_usage(&mut state, &ctx);
let stream_error = state.stream_error.take();
let has_stop_reason = state.telem_stop_reason.is_some();
let cancelled = cancel.is_cancelled();
if telemetry_level.enabled() {
let breakpoints: Vec<usize> = cleaned_messages.iter().enumerate()
.filter(|(_, m)| {
if let Some(arr) = m["content"].as_array() {
arr.last().and_then(|b| b.get("cache_control")).is_some()
} else {
false
}
})
.map(|(i, _)| i)
.collect();
let system_bytes = system_prompt.as_ref().map(|s| s.len()).unwrap_or(0);
let record = telemetry::TelemetryRecord {
ts: telemetry::TelemetryRecord::now_ms(),
request_id: telem_request_id,
msg_id: state.telem_msg_id,
model: model.to_string(),
attempt: 1, ttft_ms: state.telem_ttft,
total_ms: request_start.elapsed().as_millis() as u64,
stop_reason: state.telem_stop_reason,
usage: state.telem_usage,
ratelimit: telem_ratelimit,
cache_diag: None, context: telemetry::ContextRecord {
messages: cleaned_messages.len(),
tools: tools_schema.len(),
system_bytes,
breakpoints,
},
};
telemetry::write_record(&record);
}
match classify_stream_outcome(
stream_error,
std::mem::take(&mut state.accumulated_content),
has_stop_reason,
cancelled,
) {
StreamOutcome::Done(v) => return Ok(v),
StreamOutcome::Fail(msg) => return Err(RuntimeError::ApiStatus(msg)),
StreamOutcome::Retry(msg) => {
if non_429_attempts >= max_retries || cancel.is_cancelled() {
return Err(RuntimeError::ApiStatus(msg));
}
let delay = Duration::from_millis(1000 * 2u64.pow(non_429_attempts.min(6)));
non_429_attempts += 1;
attempt += 1;
last_err = msg.clone();
last_status = None;
tracing::warn!(
"in-stream API error, retrying {}/{} after {:?}: {}",
non_429_attempts, max_retries, delay, msg
);
let _ = tx.send(StreamEvent::Session(SessionEvent::Notice(
format!("⏳ API stream error — retrying ({}/{})…", non_429_attempts, max_retries),
)));
tokio::time::sleep(delay).await;
if cancel.is_cancelled() {
return Err(RuntimeError::Canceled);
}
}
}
} }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::telemetry::TelemetryLevel;
fn harness() -> (
ParseState,
mpsc::UnboundedSender<StreamEvent>,
mpsc::UnboundedReceiver<StreamEvent>,
) {
let (tx, rx) = mpsc::unbounded_channel();
(ParseState::new(), tx, rx)
}
fn make_ctx(tx: &mpsc::UnboundedSender<StreamEvent>) -> EventCtx<'_> {
EventCtx {
tx,
telemetry_level: TelemetryLevel::Full,
request_start: std::time::Instant::now(),
cache_ttl: crate::core::config::CacheTtl::FiveMinutes,
ttl_downgrade_notified: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
saw_1h_honored: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
request_has_1h_marker: true,
}
}
fn make_ctx_ttl<'a>(
tx: &'a mpsc::UnboundedSender<StreamEvent>,
ttl: crate::core::config::CacheTtl,
latch: &std::sync::Arc<std::sync::atomic::AtomicBool>,
honored: &std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> EventCtx<'a> {
EventCtx {
tx,
telemetry_level: TelemetryLevel::Full,
request_start: std::time::Instant::now(),
cache_ttl: ttl,
ttl_downgrade_notified: latch.clone(),
saw_1h_honored: honored.clone(),
request_has_1h_marker: true,
}
}
fn feed(lines: &[&str], state: &mut ParseState, ctx: &EventCtx) {
for line in lines {
process_data_line(line, state, ctx);
}
}
fn drain(rx: &mut mpsc::UnboundedReceiver<StreamEvent>) -> Vec<StreamEvent> {
let mut out = Vec::new();
while let Ok(ev) = rx.try_recv() {
out.push(ev);
}
out
}
#[test]
fn text_deltas_accumulate_then_flush_on_block_stop() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello, "}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"world"}}"#,
r#"data: {"type":"content_block_stop","index":0}"#,
],
&mut state,
&ctx,
);
assert_eq!(state.accumulated_content.len(), 1);
assert_eq!(state.accumulated_content[0], json!({"type":"text","text":"Hello, world"}));
assert!(state.current_text.is_empty());
let events = drain(&mut rx);
let texts: Vec<&str> = events
.iter()
.filter_map(|e| match e {
StreamEvent::Llm(LlmEvent::Text(t)) => Some(t.as_str()),
_ => None,
})
.collect();
assert_eq!(texts, vec!["Hello, ", "world"]);
}
#[test]
fn second_text_block_start_flushes_prior_text() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"first"}}"#,
r#"data: {"type":"content_block_start","index":1,"content_block":{"type":"text","text":""}}"#,
r#"data: {"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":"second"}}"#,
r#"data: {"type":"content_block_stop","index":1}"#,
],
&mut state,
&ctx,
);
assert_eq!(state.accumulated_content.len(), 2);
assert_eq!(state.accumulated_content[0], json!({"type":"text","text":"first"}));
assert_eq!(state.accumulated_content[1], json!({"type":"text","text":"second"}));
}
#[test]
fn tool_use_full_lifecycle() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"toolu_01","name":"get_weather"}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{\"city\":"}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"\"Tokyo\"}"}}"#,
r#"data: {"type":"content_block_stop","index":0}"#,
],
&mut state,
&ctx,
);
assert!(!state.in_tool_use, "flag must clear on block_stop");
assert_eq!(state.accumulated_content.len(), 1);
assert_eq!(
state.accumulated_content[0],
json!({"type":"tool_use","id":"toolu_01","name":"get_weather","input":{"city":"Tokyo"}})
);
let events = drain(&mut rx);
assert!(matches!(
&events[0],
StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id })
if tool_name == "get_weather" && tool_id == "toolu_01"
));
assert!(matches!(
&events[1],
StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, .. }) if tool_id == "toolu_01"
));
assert!(matches!(
events.last().unwrap(),
StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. })
if tool_name == "get_weather" && input == &json!({"city":"Tokyo"})
));
}
#[test]
fn tool_use_invalid_json_yields_parse_error_object() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"toolu_02","name":"run"}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{\"cmd\": truncated"}}"#,
r#"data: {"type":"content_block_stop","index":0}"#,
],
&mut state,
&ctx,
);
let input = &state.accumulated_content[0]["input"];
let err = input["__parse_error"].as_str().expect("__parse_error key present");
assert!(err.starts_with("invalid tool input JSON:"));
}
#[test]
fn tool_use_empty_input_yields_empty_object() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"toolu_03","name":"noop"}}"#,
r#"data: {"type":"content_block_stop","index":0}"#,
],
&mut state,
&ctx,
);
assert_eq!(state.accumulated_content[0]["input"], json!({}));
}
#[test]
fn thinking_lifecycle_with_signature() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"pondering"}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"sig_abc"}}"#,
r#"data: {"type":"content_block_stop","index":0}"#,
],
&mut state,
&ctx,
);
assert!(!state.in_thinking);
assert_eq!(
state.accumulated_content[0],
json!({"type":"thinking","thinking":"pondering","signature":"sig_abc"})
);
let events = drain(&mut rx);
assert!(matches!(
&events[0],
StreamEvent::Llm(LlmEvent::Thinking(t)) if t == "pondering"
));
}
#[test]
fn empty_thinking_block_never_emitted() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"sig_only"}}"#,
r#"data: {"type":"content_block_stop","index":0}"#,
],
&mut state,
&ctx,
);
assert!(state.accumulated_content.is_empty());
assert!(!state.in_thinking);
}
#[test]
fn message_delta_captures_usage_stop_reason_telemetry() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"message_delta","delta":{"stop_reason":"tool_use"},"usage":{"input_tokens":100,"output_tokens":50,"cache_read_input_tokens":300,"cache_creation_input_tokens":100,"cache_creation":{"ephemeral_5m_input_tokens":60,"ephemeral_1h_input_tokens":40}}}"#,
],
&mut state,
&ctx,
);
assert_eq!(state.telem_stop_reason.as_deref(), Some("tool_use"));
assert_eq!(state.telem_usage.input, 100);
assert_eq!(state.telem_usage.output, 50);
assert_eq!(state.telem_usage.cache_read, 300);
assert_eq!(state.telem_usage.cache_write, 100);
assert_eq!(state.telem_usage.cache_write_5m, Some(60));
assert_eq!(state.telem_usage.cache_write_1h, Some(40));
assert_eq!(state.telem_usage.hit_pct, 60.0);
let events = drain(&mut rx);
assert!(matches!(
&events[0],
StreamEvent::Session(SessionEvent::Usage { input_tokens: 100, output_tokens: 50, cache_read_input_tokens: 300, cache_creation_input_tokens: 100, cache_creation_5m: Some(60), cache_creation_1h: Some(40), model: None })
));
}
#[test]
fn live_split_from_start_survives_to_delta_emission() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"message_start","message":{"id":"msg_live","usage":{"input_tokens":4,"output_tokens":1,"cache_read_input_tokens":0,"cache_creation_input_tokens":1282,"cache_creation":{"ephemeral_5m_input_tokens":5,"ephemeral_1h_input_tokens":1277}}}}"#,
r#"data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"input_tokens":4,"output_tokens":42,"cache_read_input_tokens":0,"cache_creation_input_tokens":1282}}"#,
],
&mut state,
&ctx,
);
assert_eq!(state.telem_usage.cache_write, 1282);
assert_eq!(state.telem_usage.cache_write_5m, Some(5));
assert_eq!(state.telem_usage.cache_write_1h, Some(1277));
let events = drain(&mut rx);
let last_usage = events
.iter()
.filter(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. })))
.next_back()
.expect("delta must emit a Usage event");
assert!(matches!(
last_usage,
StreamEvent::Session(SessionEvent::Usage {
output_tokens: 42,
cache_creation_input_tokens: 1282,
cache_creation_5m: Some(5),
cache_creation_1h: Some(1277),
..
})
));
}
#[test]
fn message_start_captures_msg_id_and_usage() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"message_start","message":{"id":"msg_xyz","usage":{"input_tokens":10,"output_tokens":1,"cache_read_input_tokens":7,"cache_creation_input_tokens":3}}}"#,
],
&mut state,
&ctx,
);
assert_eq!(state.telem_msg_id.as_deref(), Some("msg_xyz"));
assert_eq!(state.msg_start_input, Some(10));
assert_eq!(state.msg_start_output, Some(1));
assert_eq!(state.msg_start_cache_read, Some(7));
assert_eq!(state.msg_start_cache_create, Some(3));
assert!(!state.usage_emitted);
let events = drain(&mut rx);
assert!(
!events.iter().any(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. }))),
"message_start must capture only — emitting here double-counts"
);
}
const PROBED_LIVE_START: &str = r#"data: {"type":"message_start","message":{"id":"msg_probe","usage":{"input_tokens":3,"cache_creation_input_tokens":1103,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":6,"ephemeral_1h_input_tokens":1097},"output_tokens":1}}}"#;
const PROBED_LIVE_DELTA: &str = r#"data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"input_tokens":3,"cache_creation_input_tokens":1103,"cache_read_input_tokens":0,"output_tokens":11}}"#;
#[test]
fn one_usage_emission_per_request_live_shapes() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(&[PROBED_LIVE_START, PROBED_LIVE_DELTA], &mut state, &ctx);
let events = drain(&mut rx);
let usages: Vec<_> = events
.iter()
.filter(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. })))
.collect();
assert_eq!(usages.len(), 1, "exactly ONE Usage event per request");
assert!(matches!(
usages[0],
StreamEvent::Session(SessionEvent::Usage {
input_tokens: 3,
output_tokens: 11, cache_read_input_tokens: 0,
cache_creation_input_tokens: 1103,
cache_creation_5m: Some(6), cache_creation_1h: Some(1097), model: None,
})
));
emit_residual_usage(&mut state, &ctx);
assert!(
drain(&mut rx).is_empty(),
"residual emission must be a no-op when the delta already emitted"
);
}
#[test]
fn dead_stream_residual_bills_start_capture() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(&[PROBED_LIVE_START], &mut state, &ctx);
assert!(drain(&mut rx).iter().all(|e| !matches!(e, StreamEvent::Session(SessionEvent::Usage { .. }))));
emit_residual_usage(&mut state, &ctx);
let events = drain(&mut rx);
let usages: Vec<_> = events
.iter()
.filter(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. })))
.collect();
assert_eq!(usages.len(), 1, "dead stream bills exactly once");
assert!(matches!(
usages[0],
StreamEvent::Session(SessionEvent::Usage {
input_tokens: 3,
output_tokens: 1, cache_read_input_tokens: 0,
cache_creation_input_tokens: 1103,
cache_creation_5m: Some(6),
cache_creation_1h: Some(1097),
model: None,
})
));
emit_residual_usage(&mut state, &ctx);
assert!(drain(&mut rx).is_empty(), "residual must be idempotent");
}
#[test]
fn dead_stream_before_delta_still_latches_1h_honored() {
let (mut state, tx, _rx) = harness();
let honored = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let notified = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let ctx = make_ctx_ttl(&tx, crate::core::config::CacheTtl::Hybrid, ¬ified, &honored);
feed(&[HONORED_START], &mut state, &ctx); assert!(
honored.load(std::sync::atomic::Ordering::Relaxed),
"saw_1h_honored must latch at message_start, not only at the delta"
);
}
#[test]
fn all_zero_usage_emits_no_event() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"message_start","message":{"id":"msg_zero","usage":{"input_tokens":0,"output_tokens":0,"cache_read_input_tokens":0,"cache_creation_input_tokens":0}}}"#,
r#"data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"input_tokens":0,"output_tokens":0,"cache_read_input_tokens":0,"cache_creation_input_tokens":0}}"#,
],
&mut state,
&ctx,
);
let events = drain(&mut rx);
assert!(
!events.iter().any(|e| matches!(e, StreamEvent::Session(SessionEvent::Usage { .. }))),
"all-zero usage must not emit a Usage event"
);
assert_eq!(state.telem_stop_reason.as_deref(), Some("end_turn"));
}
#[test]
fn ttft_set_once_on_first_event() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
assert!(state.telem_ttft.is_none());
feed(
&[r#"data: {"type":"message_start","message":{"id":"msg_1"}}"#],
&mut state,
&ctx,
);
let first = state.telem_ttft;
assert!(first.is_some());
assert!(state.first_event_seen);
std::thread::sleep(std::time::Duration::from_millis(5));
feed(
&[r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"x"}}"#],
&mut state,
&ctx,
);
assert_eq!(state.telem_ttft, first, "TTFT must not be overwritten by later events");
}
#[test]
fn tail_path_then_finalize_no_double_emit() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"toolu_tail","name":"ls"}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{}"}}"#,
],
&mut state,
&ctx,
);
process_data_line(r#"data: {"type":"content_block_stop","index":0}"#, &mut state, &ctx);
state.finalize();
let tool_blocks: Vec<&Value> = state
.accumulated_content
.iter()
.filter(|b| b["type"] == "tool_use")
.collect();
assert_eq!(tool_blocks.len(), 1, "tool_use block must be emitted exactly once");
let tool_events = drain(&mut rx)
.into_iter()
.filter(|e| matches!(e, StreamEvent::Llm(LlmEvent::ToolUse { .. })))
.count();
assert_eq!(tool_events, 1);
}
#[test]
fn finalize_flushes_partial_text() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"dangling"}}"#],
&mut state,
&ctx,
);
state.finalize();
assert_eq!(state.accumulated_content, vec![json!({"type":"text","text":"dangling"})]);
}
#[test]
fn finalize_flushes_partial_thinking() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"cut off"}}"#,
],
&mut state,
&ctx,
);
state.finalize();
assert_eq!(
state.accumulated_content,
vec![json!({"type":"thinking","thinking":"cut off","signature":""})]
);
let (mut state2, tx2, _rx2) = harness();
let ctx2 = make_ctx(&tx2);
feed(
&[r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}"#],
&mut state2,
&ctx2,
);
state2.finalize();
assert!(state2.accumulated_content.is_empty(), "empty thinking must be suppressed");
}
#[test]
fn finalize_flushes_partial_tool() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"toolu_cut","name":"grep"}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{\"pattern\":\"x\"}"}}"#,
],
&mut state,
&ctx,
);
state.finalize();
assert_eq!(
state.accumulated_content,
vec![json!({"type":"tool_use","id":"toolu_cut","name":"grep","input":{"pattern":"x"}})]
);
}
#[test]
fn finalize_is_idempotent() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"toolu_i","name":"once"}}"#,
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{}"}}"#,
],
&mut state,
&ctx,
);
state.finalize();
let after_first = state.accumulated_content.clone();
state.finalize();
assert_eq!(state.accumulated_content, after_first, "second finalize must be a no-op");
let (mut state2, tx2, _rx2) = harness();
let ctx2 = make_ctx(&tx2);
feed(
&[r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"t"}}"#],
&mut state2,
&ctx2,
);
state2.finalize();
state2.finalize();
assert_eq!(state2.accumulated_content.len(), 1);
}
#[test]
fn done_marker_and_non_data_lines_skipped() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[
"data: [DONE]",
": keepalive",
"event: foo",
"",
"data: not json at all",
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"ok"}}"#,
],
&mut state,
&ctx,
);
assert_eq!(state.current_text, "ok");
let events = drain(&mut rx);
assert_eq!(events.len(), 1, "only the valid data line may produce events");
}
type StateSnapshot = (
Vec<Value>,
String,
String,
String,
String,
bool,
String,
String,
bool,
Option<String>,
Option<String>,
);
fn snapshot(s: &ParseState) -> StateSnapshot {
(
s.accumulated_content.clone(),
s.current_text.clone(),
s.current_tool_name.clone(),
s.current_tool_id.clone(),
s.current_tool_input_json.clone(),
s.in_tool_use,
s.current_thinking.clone(),
s.current_thinking_signature.clone(),
s.in_thinking,
s.telem_msg_id.clone(),
s.telem_stop_reason.clone(),
)
}
#[test]
fn unknown_event_type_no_state_change() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"pre"}}"#],
&mut state,
&ctx,
);
drain(&mut rx);
let before = snapshot(&state);
feed(
&[
r#"data: {"type":"ping"}"#,
r#"data: {"type":"fnord","payload":[1,2,3]}"#,
],
&mut state,
&ctx,
);
assert_eq!(snapshot(&state), before, "Unknown events must not mutate state");
assert!(drain(&mut rx).is_empty(), "Unknown events must emit zero events");
assert!(state.stream_error.is_none(), "ping/unknown must not set a stream error");
}
#[test]
fn error_event_sets_stream_error() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[r#"data: {"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}"#],
&mut state,
&ctx,
);
let err = state.stream_error.expect("error event must set stream_error");
assert!(err.message.contains("overloaded_error"), "must carry the error type: {}", err.message);
assert!(err.message.contains("Overloaded"), "must carry the error message: {}", err.message);
assert!(err.retryable, "overloaded_error is a transient/retryable class");
}
#[test]
fn error_event_without_payload_still_sets_stream_error() {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
feed(&[r#"data: {"type":"error"}"#], &mut state, &ctx);
assert!(
state.stream_error.is_some(),
"error event with no payload must still set a stream error"
);
}
#[test]
fn context_overflow_error_is_terminal_not_retryable() {
for kind in ["request_too_large", "invalid_request_error", "authentication_error"] {
let (mut state, tx, _rx) = harness();
let ctx = make_ctx(&tx);
let line = format!(
r#"data: {{"type":"error","error":{{"type":"{kind}","message":"nope"}}}}"#
);
feed(&[line.as_str()], &mut state, &ctx);
let err = state.stream_error.expect("error captured");
assert!(!err.retryable, "{kind} must be terminal, not retryable");
}
}
#[test]
fn outcome_retries_on_transient_error_event() {
let r = classify_stream_outcome(
Some(StreamError { message: "overloaded".to_string(), retryable: true }),
vec![],
false,
false,
);
assert!(matches!(r, StreamOutcome::Retry(_)), "transient error must retry, got {r:?}");
}
#[test]
fn outcome_fails_on_terminal_error_event() {
let r = classify_stream_outcome(
Some(StreamError { message: "request_too_large".to_string(), retryable: false }),
vec![],
false,
false,
);
match r {
StreamOutcome::Fail(m) => assert!(m.contains("request_too_large")),
other => panic!("terminal error must Fail, got {other:?}"),
}
}
#[test]
fn outcome_fails_on_degenerate_empty_stream() {
let r = classify_stream_outcome(None, vec![], false, false);
match r {
StreamOutcome::Fail(m) => assert!(
m.contains("context window") || m.contains("overloaded"),
"error must be actionable: {m}"
),
other => panic!("empty+no-stop must Fail, got {other:?}"),
}
}
#[test]
fn outcome_done_when_empty_but_cancelled() {
let r = classify_stream_outcome(None, vec![], false, true);
assert!(matches!(r, StreamOutcome::Done(_)), "cancellation is a clean Done");
let r2 = classify_stream_outcome(
Some(StreamError { message: "overloaded".to_string(), retryable: true }),
vec![],
false,
true,
);
assert!(matches!(r2, StreamOutcome::Done(_)), "cancel beats a retryable error");
}
#[test]
fn outcome_done_when_empty_but_has_stop_reason() {
let r = classify_stream_outcome(None, vec![], true, false);
assert!(matches!(r, StreamOutcome::Done(_)), "empty-but-stop_reason is valid");
}
#[test]
fn outcome_done_passes_content_through() {
let content = vec![json!({"type":"text","text":"hi"})];
let r = classify_stream_outcome(None, content.clone(), true, false);
match r {
StreamOutcome::Done(v) => assert_eq!(v["content"], json!(content)),
other => panic!("non-empty content is always Done, got {other:?}"),
}
}
#[test]
fn outcome_error_event_beats_nonempty_content() {
let content = vec![json!({"type":"text","text":"partial"})];
let r = classify_stream_outcome(
Some(StreamError { message: "boom".to_string(), retryable: false }),
content,
true,
false,
);
assert!(matches!(r, StreamOutcome::Fail(_)), "error event wins over partial content");
}
#[test]
fn malformed_json_line_skipped() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
let before = snapshot(&state);
feed(
&[
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_de"#, r#"data: {"#,
r#"data: }{"#,
"data: \u{1f4a5}", ],
&mut state,
&ctx,
);
assert_eq!(snapshot(&state), before, "malformed lines must be skipped without state change");
assert!(drain(&mut rx).is_empty());
}
#[test]
fn multibyte_utf8_text_delta_end_to_end() {
let expected = "✨ héllo";
for data_line in [
"data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"✨ héllo\"}}",
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"\u2728 h\u00e9llo"}}"#,
] {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[data_line, r#"data: {"type":"content_block_stop","index":0}"#],
&mut state,
&ctx,
);
assert_eq!(
state.accumulated_content,
vec![json!({"type":"text","text":expected})],
"accumulated text must be byte-identical for {data_line}"
);
let events = drain(&mut rx);
assert!(
matches!(
&events[0],
StreamEvent::Llm(LlmEvent::Text(t)) if t.as_bytes() == expected.as_bytes()
),
"emitted text must be byte-identical for {data_line}"
);
}
}
#[test]
fn event_with_unknown_delta_subtype_ignored_gracefully() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"keep"}}"#],
&mut state,
&ctx,
);
drain(&mut rx);
let before = snapshot(&state);
feed(
&[r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"citations_delta","citation":{"x":1}}}"#],
&mut state,
&ctx,
);
assert_eq!(snapshot(&state), before, "unknown delta subtype must not mutate state");
assert!(drain(&mut rx).is_empty());
}
#[test]
fn tail_partial_line_typed_parse() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx);
feed(
&[r#"data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}"#],
&mut state,
&ctx,
);
let remaining: String =
r#"data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"tail ✨"}}"#
.to_string();
process_data_line(&remaining, &mut state, &ctx);
drop(remaining); state.finalize();
assert_eq!(state.accumulated_content, vec![json!({"type":"text","text":"tail ✨"})]);
let events = drain(&mut rx);
assert!(matches!(
events.last().unwrap(),
StreamEvent::Llm(LlmEvent::Text(t)) if t == "tail ✨"
));
}
const LIVE_DELTA: &str = r#"data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"input_tokens":10,"output_tokens":5,"cache_read_input_tokens":0,"cache_creation_input_tokens":100}}"#;
const DOWNGRADE_START: &str = r#"data: {"type":"message_start","message":{"id":"msg_dg","usage":{"input_tokens":10,"output_tokens":1,"cache_read_input_tokens":0,"cache_creation_input_tokens":100,"cache_creation":{"ephemeral_5m_input_tokens":100,"ephemeral_1h_input_tokens":0}}}}"#;
fn count_downgrade_notices(rx: &mut mpsc::UnboundedReceiver<StreamEvent>) -> usize {
drain(rx)
.iter()
.filter(|e| matches!(e, StreamEvent::Session(SessionEvent::Notice(t)) if t.contains("1h cache TTL not honored")))
.count()
}
const HONORED_START: &str = r#"data: {"type":"message_start","message":{"id":"msg_ok","usage":{"input_tokens":10,"output_tokens":1,"cache_read_input_tokens":0,"cache_creation_input_tokens":100,"cache_creation":{"ephemeral_5m_input_tokens":20,"ephemeral_1h_input_tokens":80}}}}"#;
#[test]
fn downgrade_detector_silent_for_healthy_hybrid_session() {
let (mut state, tx, mut rx) = harness();
let notified = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let honored = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let ctx = make_ctx_ttl(&tx, crate::core::config::CacheTtl::Hybrid, ¬ified, &honored);
feed(&[HONORED_START, LIVE_DELTA], &mut state, &ctx);
assert_eq!(count_downgrade_notices(&mut rx), 0, "turn 1 (1h honored)");
assert!(honored.load(std::sync::atomic::Ordering::Relaxed), "latch set on 1h write");
let (mut state_t2, tx_t2, mut rx_t2) = harness();
let ctx_t2 = make_ctx_ttl(&tx_t2, crate::core::config::CacheTtl::Hybrid, ¬ified, &honored);
feed(&[DOWNGRADE_START, LIVE_DELTA], &mut state_t2, &ctx_t2);
assert_eq!(count_downgrade_notices(&mut rx_t2), 0, "turn 2 (healthy hybrid signature)");
let (mut state2, tx2, mut rx2) = harness();
let ctx2 = make_ctx_ttl(&tx2, crate::core::config::CacheTtl::Hybrid, ¬ified, &honored);
feed(&[DOWNGRADE_START, LIVE_DELTA], &mut state2, &ctx2);
assert_eq!(count_downgrade_notices(&mut rx2), 0, "later request, same session");
}
#[test]
fn downgrade_detector_fires_once_when_1h_never_honored() {
for ttl in [crate::core::config::CacheTtl::OneHour, crate::core::config::CacheTtl::Hybrid] {
let (mut state, tx, mut rx) = harness();
let notified = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let honored = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let ctx = make_ctx_ttl(&tx, ttl, ¬ified, &honored);
feed(&[DOWNGRADE_START, LIVE_DELTA], &mut state, &ctx);
assert_eq!(count_downgrade_notices(&mut rx), 1, "first occurrence under {ttl:?}");
let (mut state_b, tx_b, mut rx_b) = harness();
let ctx_b = make_ctx_ttl(&tx_b, ttl, ¬ified, &honored);
feed(&[DOWNGRADE_START, LIVE_DELTA], &mut state_b, &ctx_b);
assert_eq!(count_downgrade_notices(&mut rx_b), 0, "second occurrence under {ttl:?}");
let (mut state2, tx2, mut rx2) = harness();
let ctx2 = make_ctx_ttl(&tx2, ttl, ¬ified, &honored);
feed(&[DOWNGRADE_START, LIVE_DELTA], &mut state2, &ctx2);
assert_eq!(count_downgrade_notices(&mut rx2), 0, "next request, same session");
assert_eq!(ctx2.cache_ttl, ttl);
}
}
#[test]
fn downgrade_detector_silent_under_default_5m() {
let (mut state, tx, mut rx) = harness();
let ctx = make_ctx(&tx); feed(&[DOWNGRADE_START, LIVE_DELTA], &mut state, &ctx);
assert_eq!(count_downgrade_notices(&mut rx), 0, "5m mode never warns");
}
#[test]
fn downgrade_detector_silent_when_1h_honored() {
let (mut state, tx, mut rx) = harness();
let latch = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let honored = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let ctx = make_ctx_ttl(&tx, crate::core::config::CacheTtl::OneHour, &latch, &honored);
feed(&[HONORED_START, LIVE_DELTA], &mut state, &ctx);
assert_eq!(count_downgrade_notices(&mut rx), 0);
assert!(!latch.load(std::sync::atomic::Ordering::Relaxed), "latch untouched when honored");
}
#[test]
fn downgrade_detector_silent_when_split_absent() {
let (mut state, tx, mut rx) = harness();
let latch = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let honored = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let ctx = make_ctx_ttl(&tx, crate::core::config::CacheTtl::OneHour, &latch, &honored);
feed(
&[r#"data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"input_tokens":10,"output_tokens":5,"cache_read_input_tokens":0,"cache_creation_input_tokens":100}}"#],
&mut state,
&ctx,
);
assert_eq!(count_downgrade_notices(&mut rx), 0);
}
#[test]
fn downgrade_detector_silent_on_warm_restart() {
let (mut state, tx, mut rx) = harness();
let notified = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let honored = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let ctx = make_ctx_ttl(&tx, crate::core::config::CacheTtl::Hybrid, ¬ified, &honored);
feed(
&[
r#"data: {"type":"message_start","message":{"id":"msg_warm","usage":{"input_tokens":10,"output_tokens":1,"cache_read_input_tokens":5000,"cache_creation_input_tokens":100,"cache_creation":{"ephemeral_5m_input_tokens":100,"ephemeral_1h_input_tokens":0}}}}"#,
r#"data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"input_tokens":10,"output_tokens":5,"cache_read_input_tokens":5000,"cache_creation_input_tokens":100}}"#,
],
&mut state,
&ctx,
);
assert_eq!(count_downgrade_notices(&mut rx), 0, "warm restart is healthy — no notice");
assert!(!notified.load(std::sync::atomic::Ordering::Relaxed), "notice latch untouched");
}
#[test]
fn downgrade_detector_silent_without_1h_marker() {
let (mut state, tx, mut rx) = harness();
let notified = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let honored = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let ctx = EventCtx {
tx: &tx,
telemetry_level: TelemetryLevel::Full,
request_start: std::time::Instant::now(),
cache_ttl: crate::core::config::CacheTtl::Hybrid,
ttl_downgrade_notified: notified.clone(),
saw_1h_honored: honored.clone(),
request_has_1h_marker: false,
};
feed(&[DOWNGRADE_START, LIVE_DELTA], &mut state, &ctx);
assert_eq!(count_downgrade_notices(&mut rx), 0, "no 1h marker in request → silent");
assert!(!notified.load(std::sync::atomic::Ordering::Relaxed));
}
}