atomcode_core/provider/openai.rs
1use std::pin::Pin;
2
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use futures::stream::StreamExt;
6use futures::Stream;
7use reqwest::Client;
8use serde::Deserialize;
9use serde_json::json;
10
11use crate::config::provider::ProviderConfig;
12use crate::conversation::message::{Message, MessageContent, Role};
13use crate::stream::StreamEvent;
14use crate::tool::ToolDef;
15
16use crate::auth::oauth::{get_stored_auth, refresh_access_token};
17use crate::coding_plan::crypto::{self, SignError, SignInput};
18use crate::i18n::{t, Msg};
19
20use super::{LlmProvider, ReasoningPolicy};
21
22/// Compute the signing headers (if any) for an outbound request.
23///
24/// Returns:
25/// - `Ok(vec![])` — host doesn't require signing (the common case for
26/// user-configured providers); caller proceeds unchanged.
27/// - `Ok(non-empty)` — host requires signing; caller merges these
28/// headers onto the request before `.send().await`.
29/// - `Err(_)` — host requires signing but we cannot produce a valid
30/// signature (signer unavailable, no stored auth, etc.). Caller
31/// surfaces the error to the user.
32///
33/// `override_auth` is a test seam: production callers pass `None` and
34/// the function reads `get_stored_auth()`.
35fn build_codingplan_headers(
36 base_url: &str,
37 body_bytes: &[u8],
38 override_auth: Option<(&str, &str)>,
39) -> Result<Vec<(&'static str, String)>> {
40 if !crypto::is_atomgit_gateway(base_url) {
41 return Ok(Vec::new());
42 }
43
44 // `CpAuthRequired` (no stored auth, or empty user.id /
45 // access_token) is a separate failure mode from
46 // `CpOfficialBuildRequired` (open-source build / unavailable
47 // signer). Users on an official build with no `~/.atomcode/auth.toml`
48 // would otherwise see the misleading "need official build"
49 // message — the build IS official, they just haven't logged in
50 // yet. Steer them to `/codingplan` instead.
51 let (user_id_string, token_string);
52 let (user_id, oauth_token) = match override_auth {
53 Some((uid, tok)) => (uid, tok),
54 None => {
55 let auth = get_stored_auth()
56 .ok_or_else(|| anyhow::anyhow!("{}", t(Msg::CpAuthRequired)))?;
57 user_id_string = auth.user.id.clone();
58 token_string = auth.access_token.clone();
59 (user_id_string.as_str(), token_string.as_str())
60 }
61 };
62
63 if user_id.is_empty() || oauth_token.is_empty() {
64 return Err(anyhow::anyhow!("{}", t(Msg::CpAuthRequired)));
65 }
66
67 let path = url::Url::parse(base_url)
68 .ok()
69 .map(|u| u.path().to_string())
70 .unwrap_or_else(|| "/v1/chat/completions".to_string());
71 let path = if path.ends_with("/chat/completions") {
72 path
73 } else {
74 format!("{}/chat/completions", path.trim_end_matches('/'))
75 };
76
77 let mut nonce = [0u8; 16];
78 getrandom::getrandom(&mut nonce)
79 .map_err(|e| anyhow::anyhow!("nonce generation failed: {e}"))?;
80 let ts = std::time::SystemTime::now()
81 .duration_since(std::time::UNIX_EPOCH)
82 .map_err(|e| anyhow::anyhow!("system clock before UNIX epoch: {e}"))?
83 .as_secs();
84
85 let input = SignInput {
86 method: "POST",
87 path: &path,
88 body: body_bytes,
89 oauth_token,
90 user_id,
91 timestamp_unix: ts,
92 nonce,
93 };
94
95 match crypto::signer().sign(input) {
96 Ok(out) => Ok(out.headers),
97 Err(SignError::Unavailable) => {
98 Err(anyhow::anyhow!("{}", t(Msg::CpOfficialBuildRequired)))
99 }
100 Err(SignError::Derive(detail)) => Err(anyhow::anyhow!(
101 "{} (signing-key derivation: {})",
102 t(Msg::CpOfficialBuildRequired),
103 detail
104 )),
105 }
106}
107
108pub struct OpenAiProvider {
109 client: Client,
110 /// Shared so `chat_stream` can refresh the OAuth access_token in-place
111 /// when an AtomGit-gateway request comes back 401, without rebuilding
112 /// the whole provider. For non-AtomGit providers this stays as the
113 /// user-supplied API key for the lifetime of the process — there's
114 /// no auth flow for those.
115 api_key: std::sync::Arc<tokio::sync::RwLock<String>>,
116 model: String,
117 base_url: String,
118 max_tokens: usize,
119 /// Kimi-family thinking knob: `thinking.type` in the request body.
120 /// Only emitted when the user configures it — other OpenAI-compatible
121 /// gateways may reject unknown top-level fields.
122 thinking_type: Option<String>,
123 /// Kimi K2.6 Preserved Thinking: `thinking.keep` in the request body.
124 thinking_keep: Option<String>,
125 /// User-provided override for the reasoning-history echo policy. When
126 /// `Some`, bypasses the auto-detect heuristic entirely. Parsed from
127 /// `ProviderConfig::reasoning_history` at construction so bad values
128 /// fail early at load time with a clear error, not silently mid-turn.
129 reasoning_history_override: Option<ReasoningPolicy>,
130 /// Whether the active model accepts image inputs. Drives `MultiPart`
131 /// serialisation: vision-capable → OpenAI image_url schema, text-only
132 /// → flat string. Computed once from `ProviderConfig::accepts_images()`
133 /// at construction; a `/model` switch rebuilds the provider so this
134 /// stays in sync with the live config.
135 supports_vision: bool,
136}
137
138impl OpenAiProvider {
139 pub fn new(config: &ProviderConfig) -> Result<Self> {
140 let api_key = config
141 .api_key
142 .clone()
143 .context("OpenAI provider requires an api_key")?;
144 let reasoning_history_override = match config.reasoning_history.as_deref() {
145 None => None,
146 Some(s) => match s.trim().to_ascii_lowercase().as_str() {
147 "include" => Some(ReasoningPolicy::Include),
148 "exclude" => Some(ReasoningPolicy::Exclude),
149 other => anyhow::bail!(
150 "Invalid `reasoning_history` value {:?} for provider type '{}' — \
151 expected \"include\" or \"exclude\" (unset = use auto-detect)",
152 other,
153 config.provider_type,
154 ),
155 },
156 };
157 Ok(Self {
158 client: super::build_http_client(config.user_agent.as_deref(), config.skip_tls_verify),
159 api_key: std::sync::Arc::new(tokio::sync::RwLock::new(api_key)),
160 model: config.model.clone(),
161 base_url: config
162 .base_url
163 .clone()
164 .unwrap_or_else(|| "https://api.openai.com/v1".to_string()),
165 // Cap at 16K: prevents models from spending 250s on thinking
166 // with zero visible output. CC uses fixed 16-32K, not proportional.
167 max_tokens: config
168 .max_tokens
169 .unwrap_or((config.context_window / 4).clamp(8_000, 16_384)),
170 thinking_type: config.thinking_type.clone(),
171 thinking_keep: config.thinking_keep.clone(),
172 reasoning_history_override,
173 supports_vision: config.accepts_images(),
174 })
175 }
176
177 /// Derive the reasoning echo policy from model name / base_url.
178 /// - `kimi-*` / base_url contains `moonshot` → Include (Moonshot requires
179 /// reasoning_content on every assistant tool_call or returns 400).
180 /// - `deepseek-reasoner` / `deepseek-r1` (V3 family) → Exclude (DeepSeek
181 /// V3 rejects the request if reasoning_content is echoed back).
182 /// - `deepseek-v4*` (V4 family thinking mode) → Include. DeepSeek flipped
183 /// the contract in V4: thinking-mode requests with tool calls now
184 /// REQUIRE reasoning_content on every historical assistant tool_call
185 /// message, or the API returns 400 "The `reasoning_content` in the
186 /// thinking mode must be passed back to the API". See
187 /// <https://api-docs.deepseek.com/zh-cn/guides/thinking_mode>.
188 /// - Other OpenAI-compatible endpoints → Exclude (safe default; normal
189 /// OpenAI models don't emit reasoning_content, so there's nothing to
190 /// strip, and non-thinking models typically ignore the field).
191 fn derive_reasoning_policy(model: &str, base_url: &str) -> ReasoningPolicy {
192 let m = model.to_ascii_lowercase();
193 let u = base_url.to_ascii_lowercase();
194 if m.contains("deepseek-reasoner") || m.contains("deepseek-r1") {
195 return ReasoningPolicy::Exclude;
196 }
197 if m.contains("deepseek-v4") {
198 return ReasoningPolicy::Include;
199 }
200 if m.starts_with("kimi-")
201 || m.starts_with("moonshot")
202 || u.contains("moonshot")
203 || u.contains("kimi")
204 || u.contains("xiaomimimo")
205 || u.contains("mimo")
206 {
207 return ReasoningPolicy::Include;
208 }
209 ReasoningPolicy::Exclude
210 }
211
212 /// Build Kimi's `thinking` request-body object from the two flat
213 /// config fields. Returns `None` when both are unset so the caller
214 /// omits the whole key — safer for non-Kimi gateways that might
215 /// error on an unknown top-level `thinking`.
216 fn thinking_body_value(
217 thinking_type: Option<&str>,
218 thinking_keep: Option<&str>,
219 ) -> Option<serde_json::Value> {
220 if thinking_type.is_none() && thinking_keep.is_none() {
221 return None;
222 }
223 let mut obj = serde_json::Map::new();
224 if let Some(t) = thinking_type {
225 obj.insert("type".into(), json!(t));
226 }
227 if let Some(k) = thinking_keep {
228 obj.insert("keep".into(), json!(k));
229 }
230 Some(serde_json::Value::Object(obj))
231 }
232
233 /// `supports_vision` toggles how `MessageContent::MultiPart` historical
234 /// turns are serialised. When the target model accepts images, the
235 /// content is emitted as the OpenAI vision schema (array of
236 /// `image_url` + `text` blocks). When it doesn't (text-only proxies
237 /// like GLM-5.1 on ModelArts), `MultiPart` is degraded to a flat
238 /// string — keeps the conversation replayable across `/model`
239 /// switches between vision-capable and text-only providers without
240 /// throwing the upstream's `invalid field(s): text, type` 400.
241 fn format_messages(
242 messages: &[Message],
243 reasoning_policy: ReasoningPolicy,
244 supports_vision: bool,
245 ) -> Vec<serde_json::Value> {
246 messages
247 .iter()
248 .filter_map(|m| {
249 match &m.content {
250 MessageContent::Text(s) => {
251 // Tool role with plain Text is invalid for the OpenAI API —
252 // tool results must use MessageContent::ToolResult.
253 let role = match m.role {
254 Role::System => "system",
255 Role::User => "user",
256 Role::Assistant => "assistant",
257 Role::Tool => return None,
258 };
259 // Skip empty messages
260 if s.trim().is_empty() {
261 return None;
262 }
263 let mut obj = json!({"role": role, "content": s});
264 // DeepSeek V4 tool-call round: per official docs, when a
265 // turn had tool_calls ANYWHERE, ALL reasoning_content from
266 // that turn (including the final-answer text's reasoning)
267 // must be echoed in every subsequent request — 400
268 // otherwise. Our Text variant doesn't persist per-turn
269 // reasoning, so emit a placeholder under Include. The
270 // no-tool-call case (image: 思维链 dropped) is a "may be
271 // sent, will be ignored" spec, not a rejection — safe to
272 // always emit. Kimi only validates tool_call messages, so
273 // the extra key on Text is accepted there too.
274 if matches!(m.role, Role::Assistant)
275 && matches!(reasoning_policy, ReasoningPolicy::Include)
276 {
277 obj["reasoning_content"] = json!("(no reasoning recorded)");
278 }
279 Some(obj)
280 }
281 MessageContent::AssistantWithToolCalls {
282 text,
283 tool_calls,
284 reasoning_content,
285 // Anthropic-only field; OpenAI-style endpoints don't
286 // accept `thinking` content blocks. We persist them
287 // for cross-provider switches but don't emit here.
288 thinking_blocks: _,
289 } => {
290 if tool_calls.is_empty() {
291 // No tool calls — send as plain assistant text
292 let t = text.as_deref().unwrap_or("");
293 if t.is_empty() {
294 return None;
295 }
296 let mut obj = json!({"role": "assistant", "content": t});
297 if matches!(reasoning_policy, ReasoningPolicy::Include) {
298 let echo = reasoning_content
299 .as_deref()
300 .filter(|s| !s.is_empty())
301 .unwrap_or("(no reasoning recorded)");
302 obj["reasoning_content"] = json!(echo);
303 }
304 return Some(obj);
305 }
306 let mut msg = json!({"role": "assistant"});
307 // Always include content field — some APIs (DeepSeek/SiliconFlow)
308 // reject messages without it even when tool_calls is present.
309 msg["content"] = json!(text.as_deref().unwrap_or(""));
310 // Thinking-model providers require reasoning_content to
311 // appear on every assistant tool_call message in history.
312 // Kimi only checks the key is present (empty ok). DeepSeek
313 // V4 additionally rejects an empty string ("must be passed
314 // back to the API"), so when we have no captured reasoning
315 // — cross-provider handoff (glm→deepseek), pre-fix session,
316 // or a non-thinking model that still tool-called — we emit
317 // a short non-empty placeholder. Both APIs accept any
318 // non-empty string, DeepSeek does the opposite of Kimi for
319 // Exclude so this block is gated on policy.
320 if matches!(reasoning_policy, ReasoningPolicy::Include) {
321 let echo = reasoning_content
322 .as_deref()
323 .filter(|s| !s.is_empty())
324 .unwrap_or("(no reasoning recorded)");
325 msg["reasoning_content"] = json!(echo);
326 }
327 msg["tool_calls"] = json!(tool_calls
328 .iter()
329 .map(|tc| {
330 // Ensure arguments is valid JSON — some APIs reject invalid JSON strings.
331 let args =
332 if serde_json::from_str::<serde_json::Value>(&tc.arguments)
333 .is_ok()
334 {
335 tc.arguments.clone()
336 } else {
337 // Try repair; if still invalid, wrap as a simple object
338 let repaired = repair_tool_args(&tc.arguments);
339 if serde_json::from_str::<serde_json::Value>(&repaired)
340 .is_ok()
341 {
342 repaired
343 } else {
344 json!({"input": tc.arguments}).to_string()
345 }
346 };
347 json!({
348 "id": tc.id,
349 "type": "function",
350 "function": {
351 "name": tc.name,
352 "arguments": args,
353 }
354 })
355 })
356 .collect::<Vec<_>>());
357 Some(msg)
358 }
359 MessageContent::MultiPart { text, images } => {
360 if supports_vision {
361 let mut parts: Vec<serde_json::Value> = Vec::new();
362 for img in images {
363 parts.push(json!({
364 "type": "image_url",
365 "image_url": {
366 "url": format!(
367 "data:{};base64,{}",
368 img.media_type, img.data
369 ),
370 }
371 }));
372 }
373 if let Some(t) = text {
374 parts.push(json!({"type": "text", "text": t}));
375 }
376 Some(json!({"role": "user", "content": parts}))
377 } else {
378 // Degrade to text-only — the model's wire schema
379 // doesn't support image blocks. The user's
380 // caption survives (and already has `[Image #N]`
381 // markers from the input buffer); the image bytes
382 // simply aren't representable here.
383 let content = match text {
384 Some(t) if !t.is_empty() => t.clone(),
385 _ => "[image attached]".to_string(),
386 };
387 Some(json!({"role": "user", "content": content}))
388 }
389 }
390 MessageContent::ToolResult(r) => {
391 if r.call_id.is_empty() {
392 return None;
393 }
394 Some(json!({
395 "role": "tool",
396 "tool_call_id": r.call_id,
397 "content": r.output,
398 }))
399 }
400 MessageContent::ToolResultRef(r) => {
401 if r.call_id.is_empty() {
402 return None;
403 }
404 Some(json!({
405 "role": "tool",
406 "tool_call_id": r.call_id,
407 "content": r.summary,
408 }))
409 }
410 }
411 })
412 .collect()
413 }
414}
415
416#[derive(Deserialize)]
417struct ChatChunk {
418 #[serde(default)]
419 choices: Vec<ChunkChoice>,
420 usage: Option<ChunkUsage>,
421}
422
423#[derive(Deserialize)]
424struct ChunkUsage {
425 prompt_tokens: Option<usize>,
426 completion_tokens: Option<usize>,
427 // Provider-specific cache fields (different providers use different names):
428 // OpenAI: prompt_tokens_details.cached_tokens
429 // DeepSeek/SiliconFlow: prompt_cache_hit_tokens
430 // Zhipu: cached_tokens
431 prompt_cache_hit_tokens: Option<usize>,
432 cached_tokens: Option<usize>,
433 prompt_tokens_details: Option<PromptTokensDetails>,
434}
435
436#[derive(Deserialize)]
437struct PromptTokensDetails {
438 cached_tokens: Option<usize>,
439}
440
441#[derive(Deserialize)]
442struct ChunkChoice {
443 delta: ChunkDelta,
444 finish_reason: Option<String>,
445}
446
447#[derive(Deserialize)]
448struct ChunkDelta {
449 content: Option<String>,
450 /// MiniMax M2.7 / DeepSeek R1 send thinking via this field. We forward
451 /// it as `StreamEvent::Reasoning` so `TurnRunner` can promote it to
452 /// the final text if `content` ends up empty — some gateways route
453 /// *entire* responses to `reasoning_content` for these models, which
454 /// previously showed up as a silent 0-token "Nailed it" turn.
455 reasoning_content: Option<String>,
456 tool_calls: Option<Vec<DeltaToolCall>>,
457}
458
459#[derive(Deserialize)]
460struct DeltaToolCall {
461 index: Option<usize>,
462 id: Option<String>,
463 function: Option<DeltaFunction>,
464}
465
466#[derive(Deserialize)]
467struct DeltaFunction {
468 name: Option<String>,
469 arguments: Option<String>,
470}
471
472#[derive(Deserialize)]
473struct ChatCompletionResponse {
474 #[serde(default)]
475 choices: Vec<ResponseChoice>,
476 usage: Option<ChunkUsage>,
477}
478
479#[derive(Deserialize)]
480struct ResponseChoice {
481 message: Option<ResponseMessage>,
482 finish_reason: Option<String>,
483}
484
485#[derive(Deserialize)]
486struct ResponseMessage {
487 content: Option<String>,
488 reasoning_content: Option<String>,
489}
490
491#[async_trait]
492impl LlmProvider for OpenAiProvider {
493 fn chat_stream(
494 &self,
495 messages: &[Message],
496 tools: Option<&[ToolDef]>,
497 ) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>>> {
498 let url = normalize_base_url(&self.base_url);
499 let mut body = json!({
500 "model": self.model,
501 "messages": Self::format_messages(messages, self.reasoning_history_policy(), self.supports_vision),
502 "stream": true,
503 "stream_options": { "include_usage": true },
504 "max_tokens": self.max_tokens,
505 });
506
507 if let Some(tool_defs) = tools {
508 if !tool_defs.is_empty() {
509 body["tools"] = json!(tool_defs
510 .iter()
511 .map(|td| json!({
512 "type": "function",
513 "function": {
514 "name": td.name,
515 "description": td.description,
516 "parameters": td.parameters,
517 }
518 }))
519 .collect::<Vec<_>>());
520 // Allow the model to decide whether to call multiple tools in parallel
521 }
522 }
523
524 // Kimi K2.5 / K2.6 top-level `thinking` object. Only sent when the
525 // user configured it — other OpenAI-compatible gateways may reject
526 // unknown fields, and omitting lets Kimi's default behavior apply.
527 if let Some(th) =
528 Self::thinking_body_value(self.thinking_type.as_deref(), self.thinking_keep.as_deref())
529 {
530 body["thinking"] = th;
531 }
532
533 let policy = crate::provider::retry::RetryPolicy::default_policy();
534
535 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
536
537 // ── TEMP WIRE-DUMP (debug only) ────────────────────────────────
538 // Set ATOMCODE_WIRE_DUMP=1 to dump every outbound LLM request body
539 // to ~/.atomcode/wire-dump/<timestamp>.json so we can verify what
540 // litellm / the proxy actually receives. Used to diagnose
541 // "tool_call results appear empty to the model" — comparing the
542 // wire body's `messages[N].content` against the conversation
543 // snapshot proves whether atomcode or the proxy is the source of
544 // truncation. Remove once root-caused.
545 if std::env::var("ATOMCODE_WIRE_DUMP").ok().as_deref() == Some("1") {
546 if let Ok(home) = std::env::var("HOME") {
547 let dir = std::path::PathBuf::from(home).join(".atomcode/wire-dump");
548 let _ = std::fs::create_dir_all(&dir);
549 let ts = std::time::SystemTime::now()
550 .duration_since(std::time::UNIX_EPOCH)
551 .map(|d| format!("{}.{:09}", d.as_secs(), d.subsec_nanos()))
552 .unwrap_or_else(|_| "0".to_string());
553 let path = dir.join(format!("{}.json", ts));
554 if let Ok(serialized) = serde_json::to_string_pretty(&body) {
555 let _ = std::fs::write(&path, serialized);
556 }
557 }
558 }
559 // ────────────────────────────────────────────────────────────────
560
561 // Provider truncation detector input: char count of message contents
562 // and tool_call arguments. We compare this to provider-reported
563 // prompt_tokens; if the ratio is way above any tokenizer can
564 // explain, the proxy is silently dropping content (e.g. GitCode
565 // litellm's hidden ~6.2K cap on glm-5 — 5/8 atomgr session).
566 let body_content_chars = sum_message_content_chars(&body);
567
568 // Move the pieces needed to rebuild the request into the task — the
569 // outer mid-stream retry loop reconstructs the builder on each
570 // attempt because `RequestBuilder` is single-use.
571 let client = self.client.clone();
572 let api_key = self.api_key.clone();
573 let provider_label = self.model.clone();
574 let base_url_for_signing = self.base_url.clone();
575
576 tokio::spawn(async move {
577 // Mid-stream retry: when the provider opens the stream but the
578 // chunked body errors out BEFORE any SSE `data:` line is parsed,
579 // it's safe to redo the whole request — no text/tool-call has
580 // been committed to the conversation, no UI delta has been
581 // emitted. Common cause: self-hosted endpoints that reset the
582 // connection at request open under load (the failure mode
583 // `error decoding response body` surfaces as). Once `data:` has
584 // been seen, retry would produce duplicated output, so the
585 // error is surfaced verbatim with a humanised explanation.
586 const MAX_STREAM_ATTEMPTS: u32 = 2;
587 let mut attempt: u32 = 0;
588 // AtomGit-gateway 401 reactive refresh: one shot per
589 // chat_stream call. The proactive `load_auth_token` path
590 // (provider/mod.rs) only refreshes when the local clock
591 // says the token is expired, which misses server-side
592 // revocation/rotation/clock-skew failures — those surface
593 // as a hard 401 mid-conversation. Without this flag a
594 // dead-loop is theoretically possible if the broker's
595 // refresh response still leaves us 401-able.
596 let mut auth_retry_used = false;
597 'retry: loop {
598 attempt += 1;
599 let body_bytes = match serde_json::to_vec(&body) {
600 Ok(b) => b,
601 Err(e) => {
602 let _ = tx.send(Ok(StreamEvent::Error(format!(
603 "Failed to serialize chat request body: {e}"
604 ))));
605 return;
606 }
607 };
608 let extra_headers = match build_codingplan_headers(&base_url_for_signing, &body_bytes, None) {
609 Ok(h) => h,
610 Err(e) => {
611 let _ = tx.send(Ok(StreamEvent::Error(format!("{e:#}"))));
612 return;
613 }
614 };
615 // Snapshot the current token. After a successful auth
616 // refresh below, the shared `api_key` will hold the new
617 // value and the next iteration picks it up here.
618 let current_token = api_key.read().await.clone();
619 let mut request = client
620 .post(&url)
621 .header("Authorization", format!("Bearer {}", current_token))
622 .header("Content-Type", "application/json")
623 .body(body_bytes);
624 for (name, value) in extra_headers {
625 request = request.header(name, value);
626 }
627
628 let response = match crate::provider::retry::send_with_retry(request, &policy).await
629 {
630 Ok(resp) => resp,
631 Err(e) => {
632 let _ = tx.send(Ok(StreamEvent::Error(format!(
633 "Connection failed: {}",
634 e
635 ))));
636 return;
637 }
638 };
639
640 if !response.status().is_success() {
641 let status = response.status();
642
643 // AtomGit-gateway 401: try `refresh_access_token`
644 // once. On success, write the new token back to the
645 // shared slot and retry the SAME request (doesn't
646 // count against MAX_STREAM_ATTEMPTS — that budget is
647 // for transient body-decode failures, not auth).
648 if status == reqwest::StatusCode::UNAUTHORIZED
649 && !auth_retry_used
650 && crypto::is_atomgit_gateway(&base_url_for_signing)
651 {
652 auth_retry_used = true;
653 // Drain the response body so the connection can
654 // be returned to the pool cleanly; otherwise
655 // hyper logs a "connection reset" on drop.
656 let _ = response.text().await;
657 let api_key_handle = api_key.clone();
658 let refresh_outcome =
659 tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
660 let auth = get_stored_auth().ok_or_else(|| {
661 anyhow::anyhow!("no stored auth — cannot refresh")
662 })?;
663 let new_auth = refresh_access_token(&auth)?;
664 Ok(new_auth.access_token)
665 })
666 .await;
667 if let Ok(Ok(new_token)) = refresh_outcome {
668 *api_key_handle.write().await = new_token;
669 // Auth retry doesn't burn a stream-attempt
670 // slot — mid-stream retries are for
671 // body-decode failures, which is orthogonal.
672 attempt -= 1;
673 continue 'retry;
674 }
675 // Fall through to the friendly-error branch
676 // below; response is already consumed so build
677 // the error from `status` alone.
678 let _ = tx.send(Ok(StreamEvent::Error(
679 t(Msg::ChatAuthExpired).to_string(),
680 )));
681 return;
682 }
683
684 let resp_url = response.url().to_string();
685 let body = response.text().await.unwrap_or_default();
686 // 401 from an AtomGit gateway after the auth-retry
687 // shot was already used (or refresh-then-still-401):
688 // the raw server message ("Gitcode auth: token
689 // rejected (status=401)") is not actionable. Swap
690 // for an i18n hint pointing at /login. Non-atomgit
691 // gateways (user-supplied sk-... keys) keep the
692 // verbatim message so developers see the real
693 // diagnostic.
694 let formatted = if status == reqwest::StatusCode::UNAUTHORIZED
695 && crypto::is_atomgit_gateway(&base_url_for_signing)
696 {
697 t(Msg::ChatAuthExpired).to_string()
698 } else {
699 let msg = super::extract_error_message(&body);
700 super::format_http_error(status, &resp_url, &msg)
701 };
702 let _ = tx.send(Ok(StreamEvent::Error(formatted)));
703 return;
704 }
705
706 // Per-attempt local state. Reset on each retry so a partial
707 // first attempt's accumulated bytes don't leak into the
708 // second attempt's parser.
709 let mut byte_buffer: Vec<u8> = Vec::with_capacity(4096);
710 let mut buffer = String::new();
711 let mut byte_stream = response.bytes_stream();
712
713 // ── TEMP RESPONSE WIRE-DUMP (debug only) ──────────────
714 // Pairs with the request dump — captures the raw SSE
715 // bytes coming back so we can verify whether litellm /
716 // proxy returns a standard OpenAI stream format. Files
717 // are named with `_resp` suffix to pair with the
718 // request dump preceding them. Bytes are appended so
719 // multi-chunk streams accumulate into one file.
720 let resp_dump_path: Option<std::path::PathBuf> =
721 if std::env::var("ATOMCODE_WIRE_DUMP").ok().as_deref() == Some("1") {
722 std::env::var("HOME").ok().map(|home| {
723 let dir = std::path::PathBuf::from(home).join(".atomcode/wire-dump");
724 let _ = std::fs::create_dir_all(&dir);
725 let ts = std::time::SystemTime::now()
726 .duration_since(std::time::UNIX_EPOCH)
727 .map(|d| format!("{}.{:09}", d.as_secs(), d.subsec_nanos()))
728 .unwrap_or_else(|_| "0".to_string());
729 dir.join(format!("{}_resp.sse", ts))
730 })
731 } else {
732 None
733 };
734 // ────────────────────────────────────────────────────────
735 let mut tool_calls: Vec<(String, String, String)> = Vec::new();
736 let mut last_usage: Option<crate::stream::TokenUsage> = None;
737 let mut saw_data_line = false;
738 let mut saw_valid_chunk = false;
739 let mut invalid_chunk_samples: Vec<String> = Vec::new();
740 // Track how much real content the stream actually
741 // produced. Used by the abrupt-close branch below to
742 // distinguish:
743 // * many chunks + much content → real mid-output
744 // truncation (table cut, list mid-row, …) → keep
745 // emitting Done(truncated=true) so the agent's
746 // "resume where you left off" retry can fire.
747 // * 0-2 chunks, short text, no tool calls → gateway
748 // streamed a single error blob like 「请求负载
749 // 过高,请稍后再试」 and hung up. NOT a real
750 // truncation; emit StreamEvent::Error so the
751 // agent's rate-limit / failure path takes over
752 // instead of looping the resume retry.
753 let mut content_chunks: usize = 0;
754 let mut accumulated_content = String::new();
755 // One-shot guard: if the provider's prompt_tokens looks
756 // implausibly low for our content size, log a warning once
757 // per request stream so we don't spam.
758 let mut truncation_warned = false;
759 // Held-back Done event: GitCode-style gateways emit usage
760 // in a chunk AFTER finish_reason. We capture finish_reason
761 // here, keep parsing for the trailing usage chunk, and
762 // emit this on `[DONE]` (or stream end) so token counters
763 // and the truncation detector see real numbers.
764 let mut pending_finish: Option<crate::stream::StreamEvent> = None;
765
766 loop {
767 // 120s idle timeout: if no data arrives for 2 minutes, treat as dead connection.
768 let chunk = match tokio::time::timeout(
769 std::time::Duration::from_secs(120),
770 byte_stream.next(),
771 )
772 .await
773 {
774 Ok(Some(chunk)) => chunk,
775 Ok(None) => break, // stream ended
776 Err(_) => {
777 let _ = tx.send(Ok(StreamEvent::Error(
778 "Stream timeout: no data received for 120 seconds".to_string(),
779 )));
780 return;
781 }
782 };
783
784 match chunk {
785 Ok(bytes) => {
786 // TEMP wire-dump (response side): append raw
787 // bytes as they arrive so we can inspect the
788 // exact SSE stream litellm sent back.
789 if let Some(ref p) = resp_dump_path {
790 use std::io::Write;
791 if let Ok(mut f) = std::fs::OpenOptions::new()
792 .create(true)
793 .append(true)
794 .open(p)
795 {
796 let _ = f.write_all(&bytes);
797 }
798 }
799 byte_buffer.extend_from_slice(&bytes);
800 }
801 Err(e) => {
802 // Safe-to-retry condition: stream opened but no SSE
803 // `data:` line was parsed yet. Common with
804 // self-hosted endpoints that open the response,
805 // immediately fail to start streaming, and reset
806 // the chunked body — at this point nothing has
807 // been committed downstream, so a fresh request
808 // is equivalent to a first attempt.
809 if !saw_data_line && attempt < MAX_STREAM_ATTEMPTS {
810 continue 'retry;
811 }
812 let _ = tx.send(Ok(StreamEvent::Error(humanise_stream_error(&e))));
813 return;
814 }
815 }
816
817 // Convert bytes to string, keeping incomplete UTF-8 sequences for next chunk
818 let text = match String::from_utf8(byte_buffer.clone()) {
819 Ok(s) => {
820 byte_buffer.clear();
821 s
822 }
823 Err(e) => {
824 let valid_len = e.utf8_error().valid_up_to();
825 if valid_len == 0 {
826 // No valid UTF-8 yet, wait for more bytes
827 continue;
828 }
829 let valid = String::from_utf8_lossy(&byte_buffer[..valid_len]).to_string();
830 byte_buffer = byte_buffer[valid_len..].to_vec();
831 valid
832 }
833 };
834
835 buffer.push_str(&text);
836
837 while let Some(pos) = buffer.find('\n') {
838 let line = buffer[..pos].trim().to_string();
839 buffer = buffer[pos + 1..].to_string();
840
841 if line.starts_with("data:") {
842 saw_data_line = true;
843 let data = line.strip_prefix("data:").unwrap().trim();
844 if data == "[DONE]" {
845 if let Some(usage) = last_usage.take() {
846 let _ = tx.send(Ok(StreamEvent::Usage(usage)));
847 }
848 // Emit the held-back Done from finish_reason if present;
849 // otherwise default to a non-truncated Done (e.g. providers
850 // that close the stream with [DONE] but never emit a
851 // finish_reason field).
852 let done = pending_finish
853 .take()
854 .unwrap_or(StreamEvent::Done { truncated: false });
855 let _ = tx.send(Ok(done));
856 return;
857 }
858 if let Ok(chunk) = serde_json::from_str::<ChatChunk>(data) {
859 saw_valid_chunk = true;
860 // Store usage — don't emit yet. Some providers send cumulative
861 // usage in multiple chunks; we only want the final value.
862 if let Some(usage) = &chunk.usage {
863 // Extract cached tokens from whichever field the provider uses
864 let cached = usage
865 .prompt_cache_hit_tokens
866 .or(usage.cached_tokens)
867 .or_else(|| {
868 usage
869 .prompt_tokens_details
870 .as_ref()
871 .and_then(|d| d.cached_tokens)
872 })
873 .unwrap_or(0);
874 let pt = usage.prompt_tokens.unwrap_or(0);
875 if !truncation_warned {
876 if let Some(ratio) =
877 check_truncation(body_content_chars, pt)
878 {
879 truncation_warned = true;
880 let msg = format!(
881 "Provider may be truncating input on \
882 model={}: {} content chars vs {} reported \
883 prompt_tokens (ratio {:.1} chars/token; \
884 normal mixed-content runs 2-4). If turns \
885 spiral, the proxy may be capping context.",
886 provider_label,
887 body_content_chars,
888 pt,
889 ratio,
890 );
891 let _ = tx.send(Ok(StreamEvent::Warning(msg)));
892 }
893 }
894 last_usage = Some(crate::stream::TokenUsage {
895 prompt_tokens: pt,
896 completion_tokens: usage.completion_tokens.unwrap_or(0),
897 cached_tokens: cached,
898 });
899 }
900 for choice in chunk.choices {
901 if let Some(content) = choice.delta.content {
902 if !content.is_empty() {
903 content_chunks += 1;
904 accumulated_content.push_str(&content);
905 let _ = tx.send(Ok(StreamEvent::Delta(content)));
906 }
907 }
908 if let Some(reasoning) = choice.delta.reasoning_content {
909 if !reasoning.is_empty() {
910 let _ = tx.send(Ok(StreamEvent::Reasoning(reasoning)));
911 }
912 }
913 if let Some(delta_tcs) = &choice.delta.tool_calls {
914 for tc in delta_tcs {
915 let idx = tc.index.unwrap_or(0);
916 // Grow the vec if this is a new tool call index
917 while tool_calls.len() <= idx {
918 tool_calls.push((
919 String::new(),
920 String::new(),
921 String::new(),
922 ));
923 }
924 let entry = &mut tool_calls[idx];
925 if let Some(id) = &tc.id {
926 // Some providers (e.g., ModelScope) send empty string id
927 // in incremental tool call chunks. Only emit ToolCallStart
928 // for non-empty ids.
929 if !id.is_empty() {
930 entry.0 = id.clone();
931 if let Some(func) = &tc.function {
932 entry.1 = func.name.clone().unwrap_or_default();
933 }
934 let _ = tx.send(Ok(StreamEvent::ToolCallStart {
935 id: entry.0.clone(),
936 name: entry.1.clone(),
937 }));
938 }
939 }
940 if let Some(func) = &tc.function {
941 if let Some(args) = &func.arguments {
942 entry.2.push_str(args);
943 let _ = tx.send(Ok(StreamEvent::ToolCallDelta(
944 args.clone(),
945 )));
946 }
947 }
948 }
949 }
950 if let Some(ref reason) = choice.finish_reason {
951 // Don't return here — flush tool_calls + remember the
952 // finish_reason, then keep parsing until [DONE]. Some
953 // gateways (GitCode litellm proxy on glm-5 confirmed
954 // 5/8) send `usage` in a chunk AFTER `finish_reason`,
955 // and a previous version of this code returned on
956 // finish_reason → usage chunk silently dropped → both
957 // the token counters and the truncation detector saw
958 // 0 prompt_tokens for entire sessions.
959 match reason.as_str() {
960 "tool_calls" => {
961 for (id, name, args) in &tool_calls {
962 let _ = tx.send(Ok(StreamEvent::ToolCallDone(
963 crate::tool::ToolCall {
964 id: id.clone(),
965 name: name.clone(),
966 arguments: args.clone(),
967 },
968 )));
969 }
970 tool_calls.clear();
971 pending_finish =
972 Some(StreamEvent::Done { truncated: false });
973 }
974 "length" | "max_tokens" => {
975 // Model hit token limit — flush partial tool
976 // calls so downstream sees what the model was
977 // attempting. (Args may be malformed;
978 // `repair_tool_args` + write.rs friendly errors
979 // handle that.)
980 for (id, name, args) in &tool_calls {
981 let _ = tx.send(Ok(StreamEvent::ToolCallDone(
982 crate::tool::ToolCall {
983 id: id.clone(),
984 name: name.clone(),
985 arguments: args.clone(),
986 },
987 )));
988 }
989 tool_calls.clear();
990 pending_finish =
991 Some(StreamEvent::Done { truncated: true });
992 }
993 "stop" | _ => {
994 pending_finish =
995 Some(StreamEvent::Done { truncated: false });
996 }
997 }
998 }
999 }
1000 } else if invalid_chunk_samples.len() < 3 && !data.is_empty() {
1001 invalid_chunk_samples.push(sample_for_error(data));
1002 }
1003 }
1004 }
1005 }
1006
1007 let tail = buffer.trim();
1008 if !tail.is_empty() {
1009 if let Some(events) = parse_nonstream_response(tail) {
1010 for event in events {
1011 let _ = tx.send(Ok(event));
1012 }
1013 return;
1014 }
1015 }
1016
1017 if saw_data_line && !saw_valid_chunk {
1018 let detail = if invalid_chunk_samples.is_empty() {
1019 "no chunk could be parsed".to_string()
1020 } else {
1021 format!("samples: {}", invalid_chunk_samples.join(" | "))
1022 };
1023 let _ = tx.send(Ok(StreamEvent::Error(format!(
1024 "Provider returned an unparseable OpenAI-compatible stream ({})",
1025 detail
1026 ))));
1027 return;
1028 }
1029
1030 if !tail.is_empty() {
1031 let _ = tx.send(Ok(StreamEvent::Error(format!(
1032 "Provider returned a non-SSE response AtomCode could not parse: {}",
1033 sample_for_error(tail)
1034 ))));
1035 return;
1036 }
1037
1038 // ── Stream ended without close marker ──
1039 // Reaching here means we parsed valid SSE chunks but the
1040 // stream's `bytes_stream.next()` returned `Ok(None)` (clean
1041 // close at TCP/HTTP level) WITHOUT either:
1042 // a. a `data: [DONE]` line (handled at line ~519, returns
1043 // with truncated=false), or
1044 // b. a `finish_reason` of `stop` / `length` / `tool_calls`
1045 // (handled inline in the chunk parser around line ~610,
1046 // returns with the appropriate truncated flag).
1047 //
1048 // Observed three times across May 2026 atomgr/atomcode
1049 // sessions on the self-hosted glm-5.1 endpoint:
1050 // - 5/4 21:21 Turn 23 — `error decoding response body` (Err
1051 // path, separately fixed by mid-stream retry).
1052 // - 5/5 10:06 Turn 10 — text response stopped at "1.\n"
1053 // mid-list, no close marker (this path).
1054 // - 5/5 19:37 Turn 72-73 — markdown table truncated
1055 // mid-row, no close marker (this path).
1056 //
1057 // Pre-fix this branch emitted `Done { truncated: false }`,
1058 // making the agent loop treat the partial output as a
1059 // complete response and `finish_turn(Natural)` immediately.
1060 // The user saw a cut-off table / list with no error, no
1061 // retry, and no indication that anything went wrong.
1062 //
1063 // Post-fix (this commit):
1064 // 1. Flush any in-flight tool calls so partial-args don't
1065 // silently disappear (mirrors the `length` branch's
1066 // handling at line ~622).
1067 // 2. Emit a TextDelta marker so the user (and datalog) can
1068 // see why the response was cut. Goes through the
1069 // normal stream_filter path; doesn't pollute model
1070 // context with control sequences.
1071 // 3. Emit `Done { truncated: true }` so the agent loop's
1072 // existing retry-with-resume path (`agent/mod.rs:1854`,
1073 // `if truncated && retry_count < 1`) injects the
1074 // "Output limit hit. … resume where you left off"
1075 // hint and triggers a continuation turn.
1076 // If finish_reason had already arrived (we held the Done
1077 // back waiting for trailing usage), don't downgrade it to
1078 // a truncated=true close — the model finished cleanly and
1079 // the stream just lacked a [DONE] marker. Flush any
1080 // buffered usage first so token counters are honest.
1081 if let Some(usage) = last_usage.take() {
1082 let _ = tx.send(Ok(StreamEvent::Usage(usage)));
1083 }
1084 if let Some(done) = pending_finish.take() {
1085 let _ = tx.send(Ok(done));
1086 return;
1087 }
1088
1089 // Abrupt close discriminator: if the model never made
1090 // tool-call progress AND the body arrived as a single
1091 // burst (≤ 2 content chunks), this wasn't a real
1092 // truncation — gateways like GitCode's litellm proxy
1093 // stream a single error blob (「请求负载过高,请稍后
1094 // 再试」 / a verbose `litellm.InternalServerError` JSON
1095 // envelope) and slam the connection closed without a
1096 // [DONE] marker. Promoting that to `truncated=true`
1097 // makes the agent inject "resume where you left off"
1098 // and retry, which renders the SAME error a second
1099 // time (see issue: GLM-5.1 网关限流双重渲染 /
1100 // LiteLLM 429 cooldown_list 双重渲染).
1101 // Diverting to `StreamEvent::Error` instead lets the
1102 // agent's `is_rate_limited` retry path (with 3-30s
1103 // backoff) handle it correctly — or, if it's an
1104 // unfamiliar error string, surface it once and stop.
1105 //
1106 // Discriminator is `content_chunks <= 2` alone: real
1107 // streamed completions emit many small deltas (tens
1108 // to hundreds of chunks), while gateway errors arrive
1109 // as 1-2 large chunks regardless of payload size
1110 // (Chinese 10-80-char banners or 700+-char LiteLLM
1111 // JSON envelopes both qualify). The earlier ≤ 200
1112 // char cap let the LiteLLM JSON shape slip through
1113 // to the truncated-retry path and caused the double
1114 // render. The real risk — misclassifying a 1-chunk
1115 // legit reply — is mitigated by the fact that
1116 // successful completions virtually always emit
1117 // `[DONE]`; reaching this branch already means the
1118 // stream ended anomalously.
1119 let trimmed = accumulated_content.trim();
1120 let looks_like_gateway_error =
1121 tool_calls.is_empty() && content_chunks <= 2 && !trimmed.is_empty();
1122 if looks_like_gateway_error {
1123 let _ = tx.send(Ok(StreamEvent::Error(trimmed.to_string())));
1124 return;
1125 }
1126
1127 for (id, name, args) in &tool_calls {
1128 let _ = tx.send(Ok(StreamEvent::ToolCallDone(
1129 crate::tool::ToolCall {
1130 id: id.clone(),
1131 name: name.clone(),
1132 arguments: args.clone(),
1133 },
1134 )));
1135 }
1136 tool_calls.clear();
1137 let _ = tx.send(Ok(StreamEvent::Delta(
1138 "\n[stream ended without close marker — response above may be incomplete]\n"
1139 .to_string(),
1140 )));
1141 let _ = tx.send(Ok(StreamEvent::Done { truncated: true }));
1142 return;
1143 }
1144 });
1145
1146 Ok(Box::pin(
1147 tokio_stream::wrappers::UnboundedReceiverStream::new(rx),
1148 ))
1149 }
1150
1151 fn model_name(&self) -> &str {
1152 &self.model
1153 }
1154
1155 fn reasoning_history_policy(&self) -> ReasoningPolicy {
1156 // Explicit user override wins over the name/url heuristic so a new
1157 // provider quirk can be worked around via config.toml without a
1158 // code change.
1159 if let Some(p) = self.reasoning_history_override {
1160 return p;
1161 }
1162 Self::derive_reasoning_policy(&self.model, &self.base_url)
1163 }
1164}
1165
1166/// Repair common JSON issues in tool call arguments from weak models.
1167fn repair_tool_args(s: &str) -> String {
1168 let mut r = s.trim().to_string();
1169
1170 // Remove markdown code fences
1171 if r.starts_with("```") {
1172 r = r.lines().skip(1).collect::<Vec<_>>().join("\n");
1173 }
1174 if r.ends_with("```") {
1175 r = r.strip_suffix("```").unwrap_or(&r).trim().to_string();
1176 }
1177
1178 // Remove trailing commas before } or ]
1179 loop {
1180 let before = r.clone();
1181 r = r.replace(",}", "}").replace(",]", "]");
1182 if r == before {
1183 break;
1184 }
1185 }
1186
1187 // Ensure wrapped in braces
1188 if !r.starts_with('{') && !r.starts_with('[') {
1189 r = format!("{{{}}}", r);
1190 }
1191
1192 // Balance braces
1193 let open = r.chars().filter(|c| *c == '{').count();
1194 let close = r.chars().filter(|c| *c == '}').count();
1195 for _ in 0..open.saturating_sub(close) {
1196 r.push('}');
1197 }
1198
1199 r
1200}
1201
1202/// Normalize a user-provided base_url to always end with `/chat/completions`.
1203/// Handles common mistakes:
1204/// - Trailing slash: "https://api.example.com/v1/" → "https://api.example.com/v1/chat/completions"
1205/// - Already has endpoint: "https://api.example.com/v1/chat/completions" → kept as-is
1206/// - Missing /v1: "https://api.example.com" → "https://api.example.com/chat/completions"
1207fn normalize_base_url(base: &str) -> String {
1208 let base = base.trim_end_matches('/');
1209 if base.ends_with("/chat/completions") {
1210 base.to_string()
1211 } else {
1212 format!("{}/chat/completions", base)
1213 }
1214}
1215
1216fn parse_nonstream_response(body: &str) -> Option<Vec<StreamEvent>> {
1217 let response: ChatCompletionResponse = serde_json::from_str(body).ok()?;
1218 let mut events = Vec::new();
1219
1220 if let Some(usage) = response.usage {
1221 let cached = usage
1222 .prompt_cache_hit_tokens
1223 .or(usage.cached_tokens)
1224 .or_else(|| {
1225 usage
1226 .prompt_tokens_details
1227 .as_ref()
1228 .and_then(|d| d.cached_tokens)
1229 })
1230 .unwrap_or(0);
1231 events.push(StreamEvent::Usage(crate::stream::TokenUsage {
1232 prompt_tokens: usage.prompt_tokens.unwrap_or(0),
1233 completion_tokens: usage.completion_tokens.unwrap_or(0),
1234 cached_tokens: cached,
1235 }));
1236 }
1237
1238 for choice in response.choices {
1239 if let Some(message) = choice.message {
1240 if let Some(content) = message.content {
1241 if !content.is_empty() {
1242 events.push(StreamEvent::Delta(content));
1243 }
1244 }
1245 if let Some(reasoning) = message.reasoning_content {
1246 if !reasoning.is_empty() {
1247 events.push(StreamEvent::Reasoning(reasoning));
1248 }
1249 }
1250 }
1251
1252 let truncated = matches!(
1253 choice.finish_reason.as_deref(),
1254 Some("length") | Some("max_tokens")
1255 );
1256 events.push(StreamEvent::Done { truncated });
1257 }
1258
1259 if events.is_empty() {
1260 None
1261 } else {
1262 Some(events)
1263 }
1264}
1265
1266fn sample_for_error(s: &str) -> String {
1267 let compact = s.replace('\n', "\\n");
1268 let mut sample: String = compact.chars().take(160).collect();
1269 if compact.chars().count() > 160 {
1270 sample.push_str("...");
1271 }
1272 sample
1273}
1274
1275/// Translate a `reqwest::Error` from the streaming body into something a
1276/// non-engineer user can act on. The bare `Display` for these errors is
1277/// shaped for HTTP-protocol context ("error decoding response body",
1278/// "operation timed out") and lands in the chat as gibberish — users
1279/// can't tell whether to retry, switch providers, or wait. Three buckets:
1280///
1281/// 1. `is_decode()` — the most common self-hosted-endpoint failure: the
1282/// server cut the chunked body mid-flight (worker timeout, OOM,
1283/// upstream proxy reset). Recoverable by resending; tell the user so.
1284/// 2. `is_timeout()` — request-level timeout. Same recovery signal.
1285/// 3. `is_connect()` — TCP connect failed late (rare mid-stream, but
1286/// possible on connection-pool churn). Recoverable.
1287/// Everything else falls through to the bare error text.
1288pub(crate) fn humanise_stream_error(e: &reqwest::Error) -> String {
1289 if e.is_decode() {
1290 format!(
1291 "Endpoint terminated the response stream mid-flight ({}). \
1292 The provider may have hit a worker timeout or upstream-proxy \
1293 read limit on a long generation. Try resending the message; \
1294 if it recurs, increase the endpoint's read/write timeouts \
1295 or split the request into smaller chunks.",
1296 e
1297 )
1298 } else if e.is_timeout() {
1299 format!(
1300 "Stream timeout ({}). The provider didn't deliver chunks \
1301 within the configured window. Try resending or check provider \
1302 status.",
1303 e
1304 )
1305 } else if e.is_connect() {
1306 format!(
1307 "Connection lost mid-stream ({}). Try resending; check \
1308 network reachability if it persists.",
1309 e
1310 )
1311 } else {
1312 format!("Stream error: {}", e)
1313 }
1314}
1315
1316/// Sum of every message's `content` length plus every tool_call's
1317/// `arguments` length. Used as the denominator for the
1318/// chars/prompt_tokens ratio that flags a silently-truncating proxy.
1319/// We deliberately ignore JSON keys/braces — those are constant overhead
1320/// across all bodies and would dilute the signal.
1321fn sum_message_content_chars(body: &serde_json::Value) -> usize {
1322 let mut total = 0usize;
1323 let Some(msgs) = body.get("messages").and_then(|m| m.as_array()) else {
1324 return 0;
1325 };
1326 for m in msgs {
1327 if let Some(s) = m.get("content").and_then(|c| c.as_str()) {
1328 total = total.saturating_add(s.len());
1329 } else if let Some(arr) = m.get("content").and_then(|c| c.as_array()) {
1330 // Vision multipart content: sum text fragments only (image
1331 // payloads are URL-or-base64 strings the model doesn't read
1332 // as text tokens, so counting them inflates the ratio).
1333 for part in arr {
1334 if let Some(s) = part.get("text").and_then(|t| t.as_str()) {
1335 total = total.saturating_add(s.len());
1336 }
1337 }
1338 }
1339 if let Some(tcs) = m.get("tool_calls").and_then(|t| t.as_array()) {
1340 for tc in tcs {
1341 if let Some(args) = tc
1342 .get("function")
1343 .and_then(|f| f.get("arguments"))
1344 .and_then(|a| a.as_str())
1345 {
1346 total = total.saturating_add(args.len());
1347 }
1348 }
1349 }
1350 }
1351 total
1352}
1353
1354/// Returns `Some(ratio)` if the chars-per-token ratio is high enough to
1355/// suggest the provider silently truncated the input.
1356///
1357/// Normal tokenizers across mixed CJK/English/code run 2-4 chars/token.
1358/// The threshold is 6.0: any tokenizer producing 6+ chars/token would
1359/// be doing something unprecedented; the realistic explanation is that
1360/// the proxy capped the input and reported tokens for the truncated
1361/// view. Returns None when there's nothing to compare against.
1362fn check_truncation(content_chars: usize, prompt_tokens: usize) -> Option<f64> {
1363 // Skip tiny requests (system-only ping, etc.) — ratio noise.
1364 if content_chars < 4_000 || prompt_tokens == 0 {
1365 return None;
1366 }
1367 let ratio = content_chars as f64 / prompt_tokens as f64;
1368 if ratio > 6.0 {
1369 Some(ratio)
1370 } else {
1371 None
1372 }
1373}
1374
1375#[cfg(test)]
1376mod tests {
1377 use super::{
1378 check_truncation, parse_nonstream_response, sample_for_error, sum_message_content_chars,
1379 OpenAiProvider, ReasoningPolicy,
1380 };
1381 use crate::conversation::message::{ImagePart, Message, MessageContent, Role};
1382 use crate::stream::StreamEvent;
1383
1384 /// Wire shape for `MessageContent::MultiPart`: must match OpenAI's
1385 /// vision schema exactly — `role: user`, `content: [...]` array,
1386 /// each block tagged with `type` ("image_url" or "text"). Order
1387 /// is image(s) first, text second. The PR added the multipart code
1388 /// path but no test for the wire output; without this regression
1389 /// guard a future field-rename or order-flip would silently break
1390 /// every vision-capable provider.
1391 #[test]
1392 fn multipart_serialises_to_openai_vision_schema() {
1393 let msg = Message {
1394 role: Role::User,
1395 content: MessageContent::MultiPart {
1396 text: Some("describe this".to_string()),
1397 images: vec![ImagePart {
1398 media_type: "image/png".to_string(),
1399 data: "AAAA".to_string(),
1400 }],
1401 },
1402 };
1403 let out = OpenAiProvider::format_messages(&[msg], ReasoningPolicy::Exclude, true);
1404 assert_eq!(out.len(), 1, "one message in, one out");
1405 let m = &out[0];
1406 assert_eq!(m["role"], "user");
1407 let content = m["content"].as_array().expect("content must be an array");
1408 assert_eq!(content.len(), 2, "image + text = 2 blocks");
1409 // Block 0: image, must have exactly `type` and `image_url`.
1410 assert_eq!(content[0]["type"], "image_url");
1411 assert_eq!(
1412 content[0]["image_url"]["url"],
1413 "data:image/png;base64,AAAA"
1414 );
1415 assert!(content[0].get("text").is_none(), "image block must not have text field");
1416 // Block 1: text, must use `type: text` + `text: <string>`.
1417 assert_eq!(content[1]["type"], "text");
1418 assert_eq!(content[1]["text"], "describe this");
1419 }
1420
1421 /// Multi-image variant: all images come before the text block, in
1422 /// the order they were attached.
1423 #[test]
1424 fn multipart_preserves_image_order_then_text() {
1425 let msg = Message {
1426 role: Role::User,
1427 content: MessageContent::MultiPart {
1428 text: Some("compare".to_string()),
1429 images: vec![
1430 ImagePart { media_type: "image/png".into(), data: "FIRST".into() },
1431 ImagePart { media_type: "image/jpeg".into(), data: "SECOND".into() },
1432 ],
1433 },
1434 };
1435 let out = OpenAiProvider::format_messages(&[msg], ReasoningPolicy::Exclude, true);
1436 let content = out[0]["content"].as_array().unwrap();
1437 assert_eq!(content.len(), 3);
1438 assert_eq!(content[0]["image_url"]["url"], "data:image/png;base64,FIRST");
1439 assert_eq!(content[1]["image_url"]["url"], "data:image/jpeg;base64,SECOND");
1440 assert_eq!(content[2]["type"], "text");
1441 assert_eq!(content[2]["text"], "compare");
1442 }
1443
1444 /// Image-only multipart (no caption): content array contains just
1445 /// the image block, no empty trailing text block.
1446 #[test]
1447 fn multipart_without_text_omits_text_block() {
1448 let msg = Message {
1449 role: Role::User,
1450 content: MessageContent::MultiPart {
1451 text: None,
1452 images: vec![ImagePart { media_type: "image/png".into(), data: "X".into() }],
1453 },
1454 };
1455 let out = OpenAiProvider::format_messages(&[msg], ReasoningPolicy::Exclude, true);
1456 let content = out[0]["content"].as_array().unwrap();
1457 assert_eq!(content.len(), 1, "single image block, no text block");
1458 assert_eq!(content[0]["type"], "image_url");
1459 }
1460
1461 /// Regression: the user pasted an image with a vision-capable model
1462 /// (Claude/Opus), got a reply, then ran `/model` to switch to GLM-5.1
1463 /// (text-only) and tried to send a follow-up. The conversation still
1464 /// carried the historical `MultiPart` user turn; serialising it
1465 /// against GLM-5.1's text-only schema sent `content: [...]` to the
1466 /// upstream which rejected with `ModelArts.81001 message[N].content[0]
1467 /// has invalid field(s): text, type`. The provider must gracefully
1468 /// degrade `MultiPart` → text-only string when `supports_vision = false`,
1469 /// preserving the user's caption (with our `[Image #N]` marker still
1470 /// inside) but stripping the image bytes the wire schema can't
1471 /// represent.
1472 #[test]
1473 fn multipart_degrades_to_text_when_target_is_text_only() {
1474 let history = Message {
1475 role: Role::User,
1476 content: MessageContent::MultiPart {
1477 text: Some("[Image #1] 这是什么图啊".into()),
1478 images: vec![ImagePart { media_type: "image/png".into(), data: "AAAA".into() }],
1479 },
1480 };
1481 let out = OpenAiProvider::format_messages(&[history], ReasoningPolicy::Exclude, false);
1482 assert_eq!(out.len(), 1);
1483 let m = &out[0];
1484 assert_eq!(m["role"], "user");
1485 // Content must be a flat string, NOT an array — anything else is
1486 // a 400 against text-only proxies (ModelArts, ZhipuAI, etc.).
1487 assert!(
1488 m["content"].is_string(),
1489 "text-only target must receive content as a string, got: {}",
1490 m["content"]
1491 );
1492 let content = m["content"].as_str().unwrap();
1493 assert!(
1494 content.contains("这是什么图啊"),
1495 "user's caption must survive degradation: {:?}",
1496 content
1497 );
1498 // No image_url block leakage.
1499 assert!(
1500 !content.contains("data:image"),
1501 "image bytes must not appear in degraded payload: {:?}",
1502 content
1503 );
1504 }
1505
1506 /// When `MultiPart` had no text at all (image-only paste, no caption)
1507 /// and the target is text-only, the degraded payload must still be
1508 /// non-empty — empty user content is rejected by some proxies (e.g.
1509 /// "messages must contain a non-empty content"). Use a placeholder
1510 /// so the conversation flow stays valid.
1511 #[test]
1512 fn multipart_text_only_target_uses_placeholder_when_caption_empty() {
1513 let history = Message {
1514 role: Role::User,
1515 content: MessageContent::MultiPart {
1516 text: None,
1517 images: vec![ImagePart { media_type: "image/png".into(), data: "X".into() }],
1518 },
1519 };
1520 let out = OpenAiProvider::format_messages(&[history], ReasoningPolicy::Exclude, false);
1521 let content = out[0]["content"].as_str().expect("string content");
1522 assert!(!content.is_empty(), "must be non-empty placeholder");
1523 }
1524
1525 #[test]
1526 fn parses_nonstream_text_response() {
1527 let body = r#"{
1528 "choices": [
1529 {
1530 "message": { "content": "hello" },
1531 "finish_reason": "stop"
1532 }
1533 ],
1534 "usage": { "prompt_tokens": 11, "completion_tokens": 3 }
1535 }"#;
1536
1537 let events = parse_nonstream_response(body).expect("should parse non-stream response");
1538 assert!(matches!(events[0], StreamEvent::Usage(_)));
1539 assert!(matches!(events[1], StreamEvent::Delta(ref s) if s == "hello"));
1540 assert!(matches!(events[2], StreamEvent::Done { truncated: false }));
1541 }
1542
1543 #[test]
1544 fn parses_nonstream_reasoning_only_response() {
1545 let body = r#"{
1546 "choices": [
1547 {
1548 "message": { "reasoning_content": "thinking" },
1549 "finish_reason": "length"
1550 }
1551 ]
1552 }"#;
1553
1554 let events = parse_nonstream_response(body).expect("should parse non-stream response");
1555 assert!(matches!(events[0], StreamEvent::Reasoning(ref s) if s == "thinking"));
1556 assert!(matches!(events[1], StreamEvent::Done { truncated: true }));
1557 }
1558
1559 #[test]
1560 fn sample_for_error_flattens_newlines() {
1561 assert_eq!(sample_for_error("a\nb"), "a\\nb");
1562 }
1563
1564 // ── ReasoningPolicy: model / base_url routing ──
1565
1566 #[test]
1567 fn reasoning_policy_moonshot_kimi_routes_to_include() {
1568 use super::{OpenAiProvider, ReasoningPolicy};
1569 assert_eq!(
1570 OpenAiProvider::derive_reasoning_policy(
1571 "kimi-k2-thinking",
1572 "https://api.moonshot.cn/v1"
1573 ),
1574 ReasoningPolicy::Include,
1575 );
1576 assert_eq!(
1577 OpenAiProvider::derive_reasoning_policy("kimi-k2.6", "https://api.kimi.com/v1"),
1578 ReasoningPolicy::Include,
1579 );
1580 }
1581
1582 #[test]
1583 fn reasoning_policy_deepseek_reasoner_routes_to_exclude() {
1584 use super::{OpenAiProvider, ReasoningPolicy};
1585 // DeepSeek-R1 rejects the request if reasoning_content is echoed back.
1586 assert_eq!(
1587 OpenAiProvider::derive_reasoning_policy(
1588 "deepseek-reasoner",
1589 "https://api.deepseek.com/v1"
1590 ),
1591 ReasoningPolicy::Exclude,
1592 );
1593 assert_eq!(
1594 OpenAiProvider::derive_reasoning_policy("deepseek-r1", "https://api.deepseek.com/v1"),
1595 ReasoningPolicy::Exclude,
1596 );
1597 }
1598
1599 #[test]
1600 fn reasoning_policy_deepseek_v4_routes_to_include() {
1601 use super::{OpenAiProvider, ReasoningPolicy};
1602 // DeepSeek V4 thinking mode requires reasoning_content echoed back on
1603 // assistant tool_call messages — opposite of V3/R1.
1604 assert_eq!(
1605 OpenAiProvider::derive_reasoning_policy("deepseek-v4-pro", "https://api.deepseek.com"),
1606 ReasoningPolicy::Include,
1607 );
1608 assert_eq!(
1609 OpenAiProvider::derive_reasoning_policy("deepseek-v4", "https://api.deepseek.com"),
1610 ReasoningPolicy::Include,
1611 );
1612 }
1613
1614 #[test]
1615 fn reasoning_history_config_override_wins_over_heuristic() {
1616 // `reasoning_history = "exclude"` forces Exclude even on a model that
1617 // the heuristic would route to Include (deepseek-v4-pro).
1618 use super::OpenAiProvider;
1619 use crate::config::provider::ProviderConfig;
1620 use crate::provider::{LlmProvider, ReasoningPolicy};
1621 let cfg = ProviderConfig {
1622 provider_type: "openai".into(),
1623 api_key: Some("sk-test".into()),
1624 model: "deepseek-v4-pro".into(),
1625 base_url: Some("https://api.deepseek.com".into()),
1626 system_prompt: None,
1627 user_agent: None,
1628 context_window: 128_000,
1629 max_tokens: None,
1630 thinking_type: None,
1631 thinking_keep: None,
1632 reasoning_history: Some("exclude".into()),
1633 thinking_enabled: None,
1634 thinking_budget: None,
1635 skip_tls_verify: false,
1636 ephemeral: false,
1637
1638};
1639 let p = OpenAiProvider::new(&cfg).expect("provider builds");
1640 assert_eq!(p.reasoning_history_policy(), ReasoningPolicy::Exclude);
1641
1642 // And vice versa: "include" on a plain OpenAI model (heuristic = Exclude)
1643 // forces Include — lets users unblock new providers without a code change.
1644 let cfg_inc = ProviderConfig {
1645 model: "gpt-4o".into(),
1646 base_url: Some("https://api.openai.com/v1".into()),
1647 reasoning_history: Some("include".into()),
1648 ..cfg
1649 };
1650 let p2 = OpenAiProvider::new(&cfg_inc).expect("provider builds");
1651 assert_eq!(p2.reasoning_history_policy(), ReasoningPolicy::Include);
1652 }
1653
1654 #[test]
1655 fn reasoning_history_config_invalid_value_fails_fast() {
1656 // Typos in config should surface at load time with a clear error,
1657 // not a silent policy-mismatch 400 mid-turn.
1658 use super::OpenAiProvider;
1659 use crate::config::provider::ProviderConfig;
1660 let cfg = ProviderConfig {
1661 provider_type: "openai".into(),
1662 api_key: Some("sk-test".into()),
1663 model: "gpt-4o".into(),
1664 base_url: Some("https://api.openai.com/v1".into()),
1665 system_prompt: None,
1666 user_agent: None,
1667 context_window: 128_000,
1668 max_tokens: None,
1669 thinking_type: None,
1670 thinking_keep: None,
1671 reasoning_history: Some("always".into()),
1672 thinking_enabled: None,
1673 thinking_budget: None,
1674 skip_tls_verify: false,
1675 ephemeral: false,
1676
1677};
1678 let err = match OpenAiProvider::new(&cfg) {
1679 Err(e) => e,
1680 Ok(_) => panic!("bad reasoning_history value must reject"),
1681 };
1682 let msg = err.to_string();
1683 assert!(
1684 msg.contains("reasoning_history") && msg.contains("always"),
1685 "error must name the bad field and value, got: {msg}"
1686 );
1687 }
1688
1689 #[test]
1690 fn reasoning_policy_default_is_exclude() {
1691 use super::{OpenAiProvider, ReasoningPolicy};
1692 // Unknown OpenAI-compatible endpoint → safe default: don't emit.
1693 assert_eq!(
1694 OpenAiProvider::derive_reasoning_policy("gpt-4o", "https://api.openai.com/v1"),
1695 ReasoningPolicy::Exclude,
1696 );
1697 assert_eq!(
1698 OpenAiProvider::derive_reasoning_policy("some-custom-model", "https://example.com/v1"),
1699 ReasoningPolicy::Exclude,
1700 );
1701 }
1702
1703 // ── format_messages: reasoning_content emission per policy ──
1704
1705 fn atc_message(reasoning: Option<&str>) -> crate::conversation::message::Message {
1706 use crate::conversation::message::{Message, MessageContent, Role};
1707 use crate::tool::ToolCall;
1708 Message {
1709 role: Role::Assistant,
1710 content: MessageContent::AssistantWithToolCalls {
1711 text: Some("ok".into()),
1712 tool_calls: vec![ToolCall {
1713 id: "c1".into(),
1714 name: "bash".into(),
1715 arguments: "{}".into(),
1716 }],
1717 reasoning_content: reasoning.map(|s| s.to_string()),
1718 thinking_blocks: Vec::new(),
1719 },
1720 }
1721 }
1722
1723 #[test]
1724 fn format_messages_include_with_some_reasoning_emits_field() {
1725 use super::{OpenAiProvider, ReasoningPolicy};
1726 let msgs = vec![atc_message(Some("thinking text"))];
1727 let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1728 assert_eq!(out.len(), 1);
1729 assert_eq!(out[0]["reasoning_content"], "thinking text");
1730 }
1731
1732 #[test]
1733 fn placeholder_send_side_matches_shared_constant() {
1734 // `TurnRunner::Done` skips reasoning→text promotion when the
1735 // accumulated reasoning_buf equals exactly this placeholder.
1736 // Send-side (format_messages, three call sites) MUST emit the
1737 // same byte string — otherwise a buggy gateway echoing it
1738 // back would slip past the guard and cause silent
1739 // "(no reasoning recorded) · Nailed it" stops. Pin the
1740 // contract by routing both sides through one constant.
1741 use super::{OpenAiProvider, ReasoningPolicy};
1742 use crate::provider::REASONING_PLACEHOLDER;
1743 let msgs = vec![atc_message(None)];
1744 let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1745 assert_eq!(
1746 out[0]["reasoning_content"].as_str().unwrap(),
1747 REASONING_PLACEHOLDER,
1748 );
1749 }
1750
1751 #[test]
1752 fn format_messages_include_with_none_reasoning_emits_placeholder() {
1753 // Kimi's check is "field missing" (empty ok). DeepSeek V4's check is
1754 // stricter — rejects an empty string on tool_call messages. When we
1755 // have no stored reasoning (cross-provider session, old jsonl before
1756 // capture was wired, non-thinking model that tool-called anyway), emit
1757 // a short non-empty placeholder so BOTH providers accept the message.
1758 use super::{OpenAiProvider, ReasoningPolicy};
1759 let msgs = vec![atc_message(None)];
1760 let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1761 let rc = out[0]["reasoning_content"].as_str().unwrap();
1762 assert!(
1763 !rc.is_empty(),
1764 "placeholder must be non-empty for DeepSeek V4"
1765 );
1766 }
1767
1768 #[test]
1769 fn format_messages_include_assistant_text_emits_reasoning_content() {
1770 // DeepSeek V4 tool-call round contract (per official docs): in every
1771 // subsequent request, ALL reasoning_content from the tool-call turn
1772 // must be echoed — including the reasoning for the FINAL TEXT answer
1773 // (思维链1.3 → 回答1 in the docs diagram). Our Text variant doesn't
1774 // persist per-turn reasoning, so under Include we emit a placeholder.
1775 // Regression for the "second prompt 400" bug.
1776 use super::{OpenAiProvider, ReasoningPolicy};
1777 use crate::conversation::message::{Message, MessageContent, Role};
1778 let msgs = vec![Message {
1779 role: Role::Assistant,
1780 content: MessageContent::Text("当前系统时间是 …".into()),
1781 }];
1782 let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1783 assert_eq!(out.len(), 1);
1784 let rc = out[0]["reasoning_content"].as_str();
1785 assert!(
1786 rc.map_or(false, |s| !s.is_empty()),
1787 "assistant Text under Include must carry a non-empty reasoning_content, got: {}",
1788 out[0]
1789 );
1790
1791 // Under Exclude (V3/default) the key must NOT appear on Text — sending
1792 // it would regress V3 R1 which rejects any reasoning_content echo.
1793 let out_ex = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Exclude, true);
1794 assert!(
1795 out_ex[0]
1796 .as_object()
1797 .unwrap()
1798 .get("reasoning_content")
1799 .is_none(),
1800 "Exclude must not add reasoning_content to assistant Text, got: {}",
1801 out_ex[0]
1802 );
1803 }
1804
1805 #[test]
1806 fn format_messages_include_with_empty_string_reasoning_emits_placeholder() {
1807 // Same reason as `_none_reasoning_emits_placeholder`: an empty-string
1808 // reasoning (either stored as "" or decayed from serde) must still be
1809 // replaced with the non-empty placeholder before sending.
1810 use super::{OpenAiProvider, ReasoningPolicy};
1811 let msgs = vec![atc_message(Some(""))];
1812 let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1813 let rc = out[0]["reasoning_content"].as_str().unwrap();
1814 assert!(
1815 !rc.is_empty(),
1816 "placeholder must replace empty-string reasoning"
1817 );
1818 }
1819
1820 #[test]
1821 fn format_messages_exclude_omits_reasoning_content_key() {
1822 // DeepSeek-R1 rejects the request if reasoning_content key is present,
1823 // so under Exclude we must NOT emit the key even when we have a value.
1824 use super::{OpenAiProvider, ReasoningPolicy};
1825 let msgs = vec![atc_message(Some("should be stripped"))];
1826 let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Exclude, true);
1827 assert!(
1828 out[0]
1829 .as_object()
1830 .unwrap()
1831 .get("reasoning_content")
1832 .is_none(),
1833 "reasoning_content key must be absent under Exclude, got: {}",
1834 out[0]
1835 );
1836 }
1837
1838 // ── thinking config → request body ──
1839
1840 #[test]
1841 fn thinking_body_none_when_both_unset() {
1842 use super::OpenAiProvider;
1843 // Unset = don't emit the key at all. Some OpenAI-compatible gateways
1844 // 400 on unknown top-level fields, so missing is safer than `{}`.
1845 assert!(OpenAiProvider::thinking_body_value(None, None).is_none());
1846 }
1847
1848 #[test]
1849 fn thinking_body_disabled_emits_type_only() {
1850 use super::OpenAiProvider;
1851 let out = OpenAiProvider::thinking_body_value(Some("disabled"), None).unwrap();
1852 assert_eq!(out, serde_json::json!({"type": "disabled"}));
1853 }
1854
1855 #[test]
1856 fn thinking_body_enabled_with_keep_all() {
1857 use super::OpenAiProvider;
1858 // K2.6 Preserved Thinking: the reference combination from Kimi docs.
1859 let out = OpenAiProvider::thinking_body_value(Some("enabled"), Some("all")).unwrap();
1860 assert_eq!(out, serde_json::json!({"type": "enabled", "keep": "all"}));
1861 }
1862
1863 #[test]
1864 fn thinking_fields_roundtrip_via_toml_provider_config() {
1865 // The TOML shape users will write in config.toml — flat, with a
1866 // `thinking_` prefix so each field's purpose is obvious on its own.
1867 use crate::config::provider::ProviderConfig;
1868 let toml = r#"
1869 type = "openai"
1870 model = "kimi-k2.6"
1871 base_url = "https://api.moonshot.cn/v1"
1872 api_key = "sk-x"
1873 thinking_type = "enabled"
1874 thinking_keep = "all"
1875 "#;
1876 let cfg: ProviderConfig = toml::from_str(toml).expect("TOML parse");
1877 assert_eq!(cfg.thinking_type.as_deref(), Some("enabled"));
1878 assert_eq!(cfg.thinking_keep.as_deref(), Some("all"));
1879 }
1880
1881 // ── serde backward compat for old session jsonl ──
1882
1883 #[test]
1884 fn old_jsonl_without_reasoning_content_still_deserializes() {
1885 // Session jsonl written before this field existed must still load.
1886 // `#[serde(default)]` on the field makes this work.
1887 use crate::conversation::message::MessageContent;
1888 let old = r#"{"AssistantWithToolCalls":{"text":"hi","tool_calls":[]}}"#;
1889 let parsed: MessageContent = serde_json::from_str(old)
1890 .expect("old-format AssistantWithToolCalls should deserialize");
1891 match parsed {
1892 MessageContent::AssistantWithToolCalls {
1893 text,
1894 reasoning_content,
1895 ..
1896 } => {
1897 assert_eq!(text.as_deref(), Some("hi"));
1898 assert!(reasoning_content.is_none());
1899 }
1900 other => panic!("unexpected variant: {:?}", other),
1901 }
1902 }
1903
1904 // ── provider truncation detector ──
1905
1906 #[test]
1907 fn truncation_detector_flags_gitcode_real_world_ratio() {
1908 // 5/8 atomgr session: GitCode reported 6233 prompt_tokens for a
1909 // body atomcode counted at ~78K content chars. Ratio 12.58.
1910 // This is the canary: if check_truncation ever stops firing on
1911 // this number, weak-model debugging gets harder by hours.
1912 let ratio = check_truncation(78_381, 6_233)
1913 .expect("12.58 chars/token must be flagged as truncation");
1914 assert!(ratio > 12.0 && ratio < 13.0, "ratio={}", ratio);
1915 }
1916
1917 #[test]
1918 fn truncation_detector_silent_on_normal_tokenizer() {
1919 // Siliconflow Pro/zai-org/GLM-5 same session: 127K chars / 45K
1920 // tokens = 2.81. Healthy upstream — must not warn.
1921 assert!(check_truncation(127_763, 45_518).is_none());
1922 }
1923
1924 #[test]
1925 fn truncation_detector_silent_on_english_heavy_4chars_per_token() {
1926 // Pure-English code-only request can hit ~4 chars/token. The
1927 // threshold (6.0) leaves headroom so non-truncated requests
1928 // never noise the log.
1929 assert!(check_truncation(40_000, 10_000).is_none());
1930 }
1931
1932 #[test]
1933 fn truncation_detector_skips_tiny_bodies() {
1934 // System-only ping or a bare "hi" — ratio noise dominates,
1935 // so the detector stays silent under 4K chars regardless of
1936 // the count.
1937 assert!(check_truncation(1_500, 100).is_none());
1938 }
1939
1940 #[test]
1941 fn truncation_detector_handles_zero_prompt_tokens() {
1942 // Some self-hosted gateways drop usage entirely. Don't divide
1943 // by zero, just stay silent.
1944 assert!(check_truncation(50_000, 0).is_none());
1945 }
1946
1947 #[test]
1948 fn sum_message_content_chars_sums_strings_and_tool_args() {
1949 let body = serde_json::json!({
1950 "model": "x",
1951 "messages": [
1952 {"role": "system", "content": "abc"}, // 3
1953 {"role": "user", "content": "hello"}, // 5
1954 {"role": "assistant", "content": "",
1955 "tool_calls": [
1956 {"function": {"name": "read_file",
1957 // JSON-decoded length = 12 chars
1958 "arguments": "{\"path\":\"a\"}"}},
1959 ]},
1960 {"role": "tool", "content": "result"}, // 6
1961 ]
1962 });
1963 assert_eq!(sum_message_content_chars(&body), 3 + 5 + 12 + 6);
1964 }
1965
1966 #[test]
1967 fn sum_message_content_chars_ignores_image_urls_in_multipart() {
1968 // Vision payloads have URL/base64 strings that aren't real
1969 // text tokens — counting them would falsely inflate the
1970 // chars/token ratio for vision requests.
1971 let body = serde_json::json!({
1972 "messages": [{
1973 "role": "user",
1974 "content": [
1975 {"type": "text", "text": "describe"}, // 8
1976 {"type": "image_url",
1977 "image_url": {"url": "data:image/png;base64,AAAAAAAAAA"}},
1978 ]
1979 }]
1980 });
1981 assert_eq!(sum_message_content_chars(&body), 8);
1982 }
1983
1984 #[test]
1985 fn sum_message_content_chars_safe_on_missing_messages() {
1986 let body = serde_json::json!({"model": "x"});
1987 assert_eq!(sum_message_content_chars(&body), 0);
1988 }
1989
1990 // ── abrupt-close gateway-error discriminator ───────────────────
1991 //
1992 // GLM-5.1 / litellm-style gateways respond to a 429 by streaming
1993 // a single SSE chunk carrying a Chinese error message and then
1994 // hanging up without `data: [DONE]`. Before this code path
1995 // existed, the provider mapped both that case AND "real
1996 // mid-output truncation" to `Done { truncated: true }`, causing
1997 // the agent's resume-from-truncation retry to re-fire the same
1998 // request and render the same error message twice. Tests below
1999 // pin the new behavior:
2000 //
2001 // * 1 short content chunk, no `[DONE]` → Error
2002 // * many content chunks + abrupt close → Done(truncated=true)
2003 // * 1 short chunk + tool_call + abrupt close → Done(truncated=true)
2004 // (model was making tool progress; let resume retry try again)
2005
2006 use crate::config::provider::ProviderConfig;
2007 use crate::provider::LlmProvider;
2008 use futures::StreamExt;
2009 use wiremock::matchers::{method, path};
2010 use wiremock::{Mock, MockServer, ResponseTemplate};
2011
2012 fn provider_pointing_at(url: &str) -> OpenAiProvider {
2013 OpenAiProvider::new(&ProviderConfig {
2014 provider_type: "openai".into(),
2015 api_key: Some("sk-test".into()),
2016 model: "test-model".into(),
2017 base_url: Some(format!("{}/v1", url)),
2018 system_prompt: None,
2019 user_agent: None,
2020 context_window: 8000,
2021 max_tokens: Some(1024),
2022 thinking_type: None,
2023 thinking_keep: None,
2024 reasoning_history: None,
2025 thinking_enabled: None,
2026 thinking_budget: None,
2027 skip_tls_verify: false,
2028 ephemeral: false,
2029 })
2030 .expect("provider construction")
2031 }
2032
2033 async fn collect_stream(p: &OpenAiProvider) -> Vec<StreamEvent> {
2034 let msg = Message {
2035 role: Role::User,
2036 content: MessageContent::Text("hi".into()),
2037 };
2038 let mut stream = p.chat_stream(&[msg], None).expect("stream");
2039 let mut out = Vec::new();
2040 while let Some(ev) = stream.next().await {
2041 match ev {
2042 Ok(e) => out.push(e),
2043 Err(e) => panic!("transport error: {:#}", e),
2044 }
2045 }
2046 out
2047 }
2048
2049 /// Gateway streams ONE chunk with an error blob, no DONE, then
2050 /// closes. Provider must surface that as `Error(blob)`, NOT as
2051 /// `Done { truncated: true }` (which would trigger the agent's
2052 /// resume-retry and render the same blob twice).
2053 #[tokio::test]
2054 async fn abrupt_close_with_single_error_chunk_becomes_stream_error() {
2055 let server = MockServer::start().await;
2056 let sse = "data: {\"choices\":[{\"delta\":{\"content\":\
2057 \"模型「GLM-5.1」的请求负载过高,请稍后再试。\"}}]}\n\n";
2058 Mock::given(method("POST"))
2059 .and(path("/v1/chat/completions"))
2060 .respond_with(
2061 ResponseTemplate::new(200)
2062 .insert_header("content-type", "text/event-stream")
2063 .set_body_string(sse),
2064 )
2065 .mount(&server)
2066 .await;
2067
2068 let p = provider_pointing_at(&server.uri());
2069 let events = collect_stream(&p).await;
2070 let has_error = events
2071 .iter()
2072 .any(|e| matches!(e, StreamEvent::Error(s) if s.contains("请求负载过高")));
2073 let has_truncated_done = events
2074 .iter()
2075 .any(|e| matches!(e, StreamEvent::Done { truncated: true }));
2076 let has_marker_delta = events.iter().any(|e| {
2077 matches!(e, StreamEvent::Delta(s) if s.contains("stream ended without close marker"))
2078 });
2079 assert!(
2080 has_error,
2081 "expected StreamEvent::Error(gateway blob), got: {:?}",
2082 events
2083 );
2084 assert!(
2085 !has_truncated_done,
2086 "abrupt close on tiny error blob must NOT emit Done(truncated=true): {:?}",
2087 events
2088 );
2089 assert!(
2090 !has_marker_delta,
2091 "abrupt close on tiny error blob must NOT emit the [stream ended …] marker delta: {:?}",
2092 events
2093 );
2094 }
2095
2096 /// Real-truncation case: many chunks of substantive content,
2097 /// then abrupt close (no DONE / no finish_reason). Stays on the
2098 /// existing `Done { truncated: true }` path so the agent's
2099 /// "resume where you left off" retry can salvage the partial
2100 /// output (table-cut, list-cut, etc.).
2101 #[tokio::test]
2102 async fn abrupt_close_with_substantive_content_still_emits_truncated_done() {
2103 let server = MockServer::start().await;
2104 // 5 chunks × ~50 chars each = ~250 chars of real content.
2105 // Above the 200-char and 2-chunk thresholds → not a
2106 // gateway error.
2107 let mut sse = String::new();
2108 for i in 0..5 {
2109 sse.push_str(&format!(
2110 "data: {{\"choices\":[{{\"delta\":{{\"content\":\
2111 \"line {} with enough content to clear the heuristic thresholds. \"}}}}]}}\n\n",
2112 i
2113 ));
2114 }
2115 Mock::given(method("POST"))
2116 .and(path("/v1/chat/completions"))
2117 .respond_with(
2118 ResponseTemplate::new(200)
2119 .insert_header("content-type", "text/event-stream")
2120 .set_body_string(sse),
2121 )
2122 .mount(&server)
2123 .await;
2124
2125 let p = provider_pointing_at(&server.uri());
2126 let events = collect_stream(&p).await;
2127 let has_truncated_done = events
2128 .iter()
2129 .any(|e| matches!(e, StreamEvent::Done { truncated: true }));
2130 let has_error = events.iter().any(|e| matches!(e, StreamEvent::Error(_)));
2131 assert!(
2132 has_truncated_done,
2133 "substantive content + abrupt close must keep Done(truncated=true): {:?}",
2134 events
2135 );
2136 assert!(
2137 !has_error,
2138 "real truncation must NOT be misclassified as Error: {:?}",
2139 events
2140 );
2141 }
2142
2143 // 401 from a non-atomgit gateway must keep the verbatim server
2144 // error message — user-supplied `sk-...` API keys are not
2145 // refreshable by us, and developers debugging a bad key need to
2146 // see what the upstream actually said. The new auth-retry path is
2147 // only allowed to engage for AtomGit gateway hosts.
2148 #[tokio::test]
2149 async fn non_atomgit_gateway_401_keeps_verbatim_error() {
2150 let server = MockServer::start().await;
2151 let body = r#"{"error":{"message":"invalid_api_key"}}"#;
2152 Mock::given(method("POST"))
2153 .and(path("/v1/chat/completions"))
2154 .respond_with(
2155 ResponseTemplate::new(401)
2156 .insert_header("content-type", "application/json")
2157 .set_body_string(body),
2158 )
2159 .mount(&server)
2160 .await;
2161
2162 let p = provider_pointing_at(&server.uri());
2163 let events = collect_stream(&p).await;
2164 let err_msg = events
2165 .iter()
2166 .find_map(|e| match e {
2167 StreamEvent::Error(s) => Some(s.clone()),
2168 _ => None,
2169 })
2170 .unwrap_or_else(|| panic!("expected StreamEvent::Error, got: {:?}", events));
2171 assert!(
2172 err_msg.contains("invalid_api_key"),
2173 "non-atomgit 401 must include verbatim server message: {}",
2174 err_msg
2175 );
2176 // The i18n-friendly hint is reserved for atomgit-gateway 401s;
2177 // it must NOT leak into the generic OpenAI / sk-... path.
2178 let friendly = crate::i18n::t(crate::i18n::Msg::ChatAuthExpired).to_string();
2179 assert!(
2180 !err_msg.contains(&friendly),
2181 "non-atomgit 401 must NOT receive the AtomGit /login hint: got {}",
2182 err_msg
2183 );
2184 }
2185}
2186
2187#[cfg(test)]
2188mod chat_auth_expired_i18n_tests {
2189 use crate::i18n::{t_with, Locale, Msg};
2190
2191 #[test]
2192 fn message_is_non_empty_in_both_locales() {
2193 // Both must surface SOMETHING — an empty hint defeats the
2194 // entire point of this branch (point user at /login).
2195 let zh = t_with(Locale::ZhCn, Msg::ChatAuthExpired).to_string();
2196 let en = t_with(Locale::En, Msg::ChatAuthExpired).to_string();
2197 assert!(!zh.is_empty(), "zh message must be populated");
2198 assert!(!en.is_empty(), "en message must be populated");
2199 }
2200
2201 #[test]
2202 fn message_mentions_login_in_both_locales() {
2203 // The whole point of this message is to direct the user to
2204 // re-authenticate. If a future translator drops the `/login`
2205 // reference the hint becomes useless.
2206 let zh = t_with(Locale::ZhCn, Msg::ChatAuthExpired).to_string();
2207 let en = t_with(Locale::En, Msg::ChatAuthExpired).to_string();
2208 assert!(zh.contains("/login"), "zh message must mention /login: {}", zh);
2209 assert!(en.contains("/login"), "en message must mention /login: {}", en);
2210 }
2211}
2212
2213#[cfg(test)]
2214mod codingplan_signing_tests {
2215 use super::*;
2216
2217 #[test]
2218 fn build_signed_headers_returns_empty_for_non_atomgit_host() {
2219 let headers = build_codingplan_headers(
2220 "https://api.openai.com/v1",
2221 b"{}",
2222 None,
2223 )
2224 .expect("non-atomgit host must not error");
2225 assert!(headers.is_empty(), "got unexpected headers: {:?}", headers);
2226 }
2227
2228 #[test]
2229 fn build_signed_headers_errors_when_atomgit_host_in_open_source_build() {
2230 // Open-source build: signer() is UnavailableSigner, so an
2231 // atomgit-bound request must error with the localised hint.
2232 let err = build_codingplan_headers(
2233 "https://llm-api.atomgit.com/v1",
2234 b"{}",
2235 Some(("dummy-user-id", "dummy-token")),
2236 )
2237 .expect_err("open-source build must error out");
2238 let msg = format!("{:#}", err);
2239 assert!(
2240 msg.contains("official") || msg.contains("官方"),
2241 "error message should mention the official-build requirement, got: {msg}",
2242 );
2243 }
2244
2245 #[test]
2246 fn build_signed_headers_errors_when_atomgit_host_with_empty_auth() {
2247 let err = build_codingplan_headers(
2248 "https://llm-api.atomgit.com/v1",
2249 b"{}",
2250 Some(("", "")),
2251 )
2252 .expect_err("empty auth must error");
2253 assert!(!format!("{:#}", err).is_empty());
2254 }
2255}