Skip to main content

atomcode_core/provider/
openai.rs

1use std::pin::Pin;
2
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use futures::stream::StreamExt;
6use futures::Stream;
7use reqwest::Client;
8use serde::Deserialize;
9use serde_json::json;
10
11use crate::config::provider::ProviderConfig;
12use crate::conversation::message::{Message, MessageContent, Role};
13use crate::stream::StreamEvent;
14use crate::tool::ToolDef;
15
16use crate::auth::oauth::{get_stored_auth, refresh_access_token};
17use crate::coding_plan::crypto::{self, SignError, SignInput};
18use crate::i18n::{t, Msg};
19
20use super::{LlmProvider, ReasoningPolicy};
21
22/// Compute the signing headers (if any) for an outbound request.
23///
24/// Returns:
25/// - `Ok(vec![])` — host doesn't require signing (the common case for
26///   user-configured providers); caller proceeds unchanged.
27/// - `Ok(non-empty)` — host requires signing; caller merges these
28///   headers onto the request before `.send().await`.
29/// - `Err(_)` — host requires signing but we cannot produce a valid
30///   signature (signer unavailable, no stored auth, etc.). Caller
31///   surfaces the error to the user.
32///
33/// `override_auth` is a test seam: production callers pass `None` and
34/// the function reads `get_stored_auth()`.
35fn build_codingplan_headers(
36    base_url: &str,
37    body_bytes: &[u8],
38    override_auth: Option<(&str, &str)>,
39) -> Result<Vec<(&'static str, String)>> {
40    if !crypto::is_atomgit_gateway(base_url) {
41        return Ok(Vec::new());
42    }
43
44    // `CpAuthRequired` (no stored auth, or empty user.id /
45    // access_token) is a separate failure mode from
46    // `CpOfficialBuildRequired` (open-source build / unavailable
47    // signer). Users on an official build with no `~/.atomcode/auth.toml`
48    // would otherwise see the misleading "need official build"
49    // message — the build IS official, they just haven't logged in
50    // yet. Steer them to `/codingplan` instead.
51    let (user_id_string, token_string);
52    let (user_id, oauth_token) = match override_auth {
53        Some((uid, tok)) => (uid, tok),
54        None => {
55            let auth = get_stored_auth()
56                .ok_or_else(|| anyhow::anyhow!("{}", t(Msg::CpAuthRequired)))?;
57            user_id_string = auth.user.id.clone();
58            token_string = auth.access_token.clone();
59            (user_id_string.as_str(), token_string.as_str())
60        }
61    };
62
63    if user_id.is_empty() || oauth_token.is_empty() {
64        return Err(anyhow::anyhow!("{}", t(Msg::CpAuthRequired)));
65    }
66
67    let path = url::Url::parse(base_url)
68        .ok()
69        .map(|u| u.path().to_string())
70        .unwrap_or_else(|| "/v1/chat/completions".to_string());
71    let path = if path.ends_with("/chat/completions") {
72        path
73    } else {
74        format!("{}/chat/completions", path.trim_end_matches('/'))
75    };
76
77    let mut nonce = [0u8; 16];
78    getrandom::getrandom(&mut nonce)
79        .map_err(|e| anyhow::anyhow!("nonce generation failed: {e}"))?;
80    let ts = std::time::SystemTime::now()
81        .duration_since(std::time::UNIX_EPOCH)
82        .map_err(|e| anyhow::anyhow!("system clock before UNIX epoch: {e}"))?
83        .as_secs();
84
85    let input = SignInput {
86        method: "POST",
87        path: &path,
88        body: body_bytes,
89        oauth_token,
90        user_id,
91        timestamp_unix: ts,
92        nonce,
93    };
94
95    match crypto::signer().sign(input) {
96        Ok(out) => Ok(out.headers),
97        Err(SignError::Unavailable) => {
98            Err(anyhow::anyhow!("{}", t(Msg::CpOfficialBuildRequired)))
99        }
100        Err(SignError::Derive(detail)) => Err(anyhow::anyhow!(
101            "{} (signing-key derivation: {})",
102            t(Msg::CpOfficialBuildRequired),
103            detail
104        )),
105    }
106}
107
108pub struct OpenAiProvider {
109    client: Client,
110    /// Shared so `chat_stream` can refresh the OAuth access_token in-place
111    /// when an AtomGit-gateway request comes back 401, without rebuilding
112    /// the whole provider. For non-AtomGit providers this stays as the
113    /// user-supplied API key for the lifetime of the process — there's
114    /// no auth flow for those.
115    api_key: std::sync::Arc<tokio::sync::RwLock<String>>,
116    model: String,
117    base_url: String,
118    max_tokens: usize,
119    /// Kimi-family thinking knob: `thinking.type` in the request body.
120    /// Only emitted when the user configures it — other OpenAI-compatible
121    /// gateways may reject unknown top-level fields.
122    thinking_type: Option<String>,
123    /// Kimi K2.6 Preserved Thinking: `thinking.keep` in the request body.
124    thinking_keep: Option<String>,
125    /// User-provided override for the reasoning-history echo policy. When
126    /// `Some`, bypasses the auto-detect heuristic entirely. Parsed from
127    /// `ProviderConfig::reasoning_history` at construction so bad values
128    /// fail early at load time with a clear error, not silently mid-turn.
129    reasoning_history_override: Option<ReasoningPolicy>,
130    /// Whether the active model accepts image inputs. Drives `MultiPart`
131    /// serialisation: vision-capable → OpenAI image_url schema, text-only
132    /// → flat string. Computed once from `ProviderConfig::accepts_images()`
133    /// at construction; a `/model` switch rebuilds the provider so this
134    /// stays in sync with the live config.
135    supports_vision: bool,
136}
137
138impl OpenAiProvider {
139    pub fn new(config: &ProviderConfig) -> Result<Self> {
140        let api_key = config
141            .api_key
142            .clone()
143            .context("OpenAI provider requires an api_key")?;
144        let reasoning_history_override = match config.reasoning_history.as_deref() {
145            None => None,
146            Some(s) => match s.trim().to_ascii_lowercase().as_str() {
147                "include" => Some(ReasoningPolicy::Include),
148                "exclude" => Some(ReasoningPolicy::Exclude),
149                other => anyhow::bail!(
150                    "Invalid `reasoning_history` value {:?} for provider type '{}' — \
151                     expected \"include\" or \"exclude\" (unset = use auto-detect)",
152                    other,
153                    config.provider_type,
154                ),
155            },
156        };
157        Ok(Self {
158            client: super::build_http_client(config.user_agent.as_deref(), config.skip_tls_verify),
159            api_key: std::sync::Arc::new(tokio::sync::RwLock::new(api_key)),
160            model: config.model.clone(),
161            base_url: config
162                .base_url
163                .clone()
164                .unwrap_or_else(|| "https://api.openai.com/v1".to_string()),
165            // Cap at 16K: prevents models from spending 250s on thinking
166            // with zero visible output. CC uses fixed 16-32K, not proportional.
167            max_tokens: config
168                .max_tokens
169                .unwrap_or((config.context_window / 4).clamp(8_000, 16_384)),
170            thinking_type: config.thinking_type.clone(),
171            thinking_keep: config.thinking_keep.clone(),
172            reasoning_history_override,
173            supports_vision: config.accepts_images(),
174        })
175    }
176
177    /// Derive the reasoning echo policy from model name / base_url.
178    /// - `kimi-*` / base_url contains `moonshot` → Include (Moonshot requires
179    ///   reasoning_content on every assistant tool_call or returns 400).
180    /// - `deepseek-reasoner` / `deepseek-r1` (V3 family) → Exclude (DeepSeek
181    ///   V3 rejects the request if reasoning_content is echoed back).
182    /// - `deepseek-v4*` (V4 family thinking mode) → Include. DeepSeek flipped
183    ///   the contract in V4: thinking-mode requests with tool calls now
184    ///   REQUIRE reasoning_content on every historical assistant tool_call
185    ///   message, or the API returns 400 "The `reasoning_content` in the
186    ///   thinking mode must be passed back to the API". See
187    ///   <https://api-docs.deepseek.com/zh-cn/guides/thinking_mode>.
188    /// - Other OpenAI-compatible endpoints → Exclude (safe default; normal
189    ///   OpenAI models don't emit reasoning_content, so there's nothing to
190    ///   strip, and non-thinking models typically ignore the field).
191    fn derive_reasoning_policy(model: &str, base_url: &str) -> ReasoningPolicy {
192        let m = model.to_ascii_lowercase();
193        let u = base_url.to_ascii_lowercase();
194        if m.contains("deepseek-reasoner") || m.contains("deepseek-r1") {
195            return ReasoningPolicy::Exclude;
196        }
197        if m.contains("deepseek-v4") {
198            return ReasoningPolicy::Include;
199        }
200        if m.starts_with("kimi-")
201            || m.starts_with("moonshot")
202            || u.contains("moonshot")
203            || u.contains("kimi")
204            || u.contains("xiaomimimo")
205            || u.contains("mimo")
206        {
207            return ReasoningPolicy::Include;
208        }
209        ReasoningPolicy::Exclude
210    }
211
212    /// Build Kimi's `thinking` request-body object from the two flat
213    /// config fields. Returns `None` when both are unset so the caller
214    /// omits the whole key — safer for non-Kimi gateways that might
215    /// error on an unknown top-level `thinking`.
216    fn thinking_body_value(
217        thinking_type: Option<&str>,
218        thinking_keep: Option<&str>,
219    ) -> Option<serde_json::Value> {
220        if thinking_type.is_none() && thinking_keep.is_none() {
221            return None;
222        }
223        let mut obj = serde_json::Map::new();
224        if let Some(t) = thinking_type {
225            obj.insert("type".into(), json!(t));
226        }
227        if let Some(k) = thinking_keep {
228            obj.insert("keep".into(), json!(k));
229        }
230        Some(serde_json::Value::Object(obj))
231    }
232
233    /// `supports_vision` toggles how `MessageContent::MultiPart` historical
234    /// turns are serialised. When the target model accepts images, the
235    /// content is emitted as the OpenAI vision schema (array of
236    /// `image_url` + `text` blocks). When it doesn't (text-only proxies
237    /// like GLM-5.1 on ModelArts), `MultiPart` is degraded to a flat
238    /// string — keeps the conversation replayable across `/model`
239    /// switches between vision-capable and text-only providers without
240    /// throwing the upstream's `invalid field(s): text, type` 400.
241    fn format_messages(
242        messages: &[Message],
243        reasoning_policy: ReasoningPolicy,
244        supports_vision: bool,
245    ) -> Vec<serde_json::Value> {
246        messages
247            .iter()
248            .filter_map(|m| {
249                match &m.content {
250                    MessageContent::Text(s) => {
251                        // Tool role with plain Text is invalid for the OpenAI API —
252                        // tool results must use MessageContent::ToolResult.
253                        let role = match m.role {
254                            Role::System => "system",
255                            Role::User => "user",
256                            Role::Assistant => "assistant",
257                            Role::Tool => return None,
258                        };
259                        // Skip empty messages
260                        if s.trim().is_empty() {
261                            return None;
262                        }
263                        let mut obj = json!({"role": role, "content": s});
264                        // DeepSeek V4 tool-call round: per official docs, when a
265                        // turn had tool_calls ANYWHERE, ALL reasoning_content from
266                        // that turn (including the final-answer text's reasoning)
267                        // must be echoed in every subsequent request — 400
268                        // otherwise. Our Text variant doesn't persist per-turn
269                        // reasoning, so emit a placeholder under Include. The
270                        // no-tool-call case (image: 思维链 dropped) is a "may be
271                        // sent, will be ignored" spec, not a rejection — safe to
272                        // always emit. Kimi only validates tool_call messages, so
273                        // the extra key on Text is accepted there too.
274                        if matches!(m.role, Role::Assistant)
275                            && matches!(reasoning_policy, ReasoningPolicy::Include)
276                        {
277                            obj["reasoning_content"] = json!("(no reasoning recorded)");
278                        }
279                        Some(obj)
280                    }
281                    MessageContent::AssistantWithToolCalls {
282                        text,
283                        tool_calls,
284                        reasoning_content,
285                        // Anthropic-only field; OpenAI-style endpoints don't
286                        // accept `thinking` content blocks. We persist them
287                        // for cross-provider switches but don't emit here.
288                        thinking_blocks: _,
289                    } => {
290                        if tool_calls.is_empty() {
291                            // No tool calls — send as plain assistant text
292                            let t = text.as_deref().unwrap_or("");
293                            if t.is_empty() {
294                                return None;
295                            }
296                            let mut obj = json!({"role": "assistant", "content": t});
297                            if matches!(reasoning_policy, ReasoningPolicy::Include) {
298                                let echo = reasoning_content
299                                    .as_deref()
300                                    .filter(|s| !s.is_empty())
301                                    .unwrap_or("(no reasoning recorded)");
302                                obj["reasoning_content"] = json!(echo);
303                            }
304                            return Some(obj);
305                        }
306                        let mut msg = json!({"role": "assistant"});
307                        // Always include content field — some APIs (DeepSeek/SiliconFlow)
308                        // reject messages without it even when tool_calls is present.
309                        msg["content"] = json!(text.as_deref().unwrap_or(""));
310                        // Thinking-model providers require reasoning_content to
311                        // appear on every assistant tool_call message in history.
312                        // Kimi only checks the key is present (empty ok). DeepSeek
313                        // V4 additionally rejects an empty string ("must be passed
314                        // back to the API"), so when we have no captured reasoning
315                        // — cross-provider handoff (glm→deepseek), pre-fix session,
316                        // or a non-thinking model that still tool-called — we emit
317                        // a short non-empty placeholder. Both APIs accept any
318                        // non-empty string, DeepSeek does the opposite of Kimi for
319                        // Exclude so this block is gated on policy.
320                        if matches!(reasoning_policy, ReasoningPolicy::Include) {
321                            let echo = reasoning_content
322                                .as_deref()
323                                .filter(|s| !s.is_empty())
324                                .unwrap_or("(no reasoning recorded)");
325                            msg["reasoning_content"] = json!(echo);
326                        }
327                        msg["tool_calls"] = json!(tool_calls
328                            .iter()
329                            .map(|tc| {
330                                // Ensure arguments is valid JSON — some APIs reject invalid JSON strings.
331                                let args =
332                                    if serde_json::from_str::<serde_json::Value>(&tc.arguments)
333                                        .is_ok()
334                                    {
335                                        tc.arguments.clone()
336                                    } else {
337                                        // Try repair; if still invalid, wrap as a simple object
338                                        let repaired = repair_tool_args(&tc.arguments);
339                                        if serde_json::from_str::<serde_json::Value>(&repaired)
340                                            .is_ok()
341                                        {
342                                            repaired
343                                        } else {
344                                            json!({"input": tc.arguments}).to_string()
345                                        }
346                                    };
347                                json!({
348                                    "id": tc.id,
349                                    "type": "function",
350                                    "function": {
351                                        "name": tc.name,
352                                        "arguments": args,
353                                    }
354                                })
355                            })
356                            .collect::<Vec<_>>());
357                        Some(msg)
358                    }
359                    MessageContent::MultiPart { text, images } => {
360                        if supports_vision {
361                            let mut parts: Vec<serde_json::Value> = Vec::new();
362                            for img in images {
363                                parts.push(json!({
364                                    "type": "image_url",
365                                    "image_url": {
366                                        "url": format!(
367                                            "data:{};base64,{}",
368                                            img.media_type, img.data
369                                        ),
370                                    }
371                                }));
372                            }
373                            if let Some(t) = text {
374                                parts.push(json!({"type": "text", "text": t}));
375                            }
376                            Some(json!({"role": "user", "content": parts}))
377                        } else {
378                            // Degrade to text-only — the model's wire schema
379                            // doesn't support image blocks. The user's
380                            // caption survives (and already has `[Image #N]`
381                            // markers from the input buffer); the image bytes
382                            // simply aren't representable here.
383                            let content = match text {
384                                Some(t) if !t.is_empty() => t.clone(),
385                                _ => "[image attached]".to_string(),
386                            };
387                            Some(json!({"role": "user", "content": content}))
388                        }
389                    }
390                    MessageContent::ToolResult(r) => {
391                        if r.call_id.is_empty() {
392                            return None;
393                        }
394                        Some(json!({
395                            "role": "tool",
396                            "tool_call_id": r.call_id,
397                            "content": r.output,
398                        }))
399                    }
400                    MessageContent::ToolResultRef(r) => {
401                        if r.call_id.is_empty() {
402                            return None;
403                        }
404                        Some(json!({
405                            "role": "tool",
406                            "tool_call_id": r.call_id,
407                            "content": r.summary,
408                        }))
409                    }
410                }
411            })
412            .collect()
413    }
414}
415
416#[derive(Deserialize)]
417struct ChatChunk {
418    #[serde(default)]
419    choices: Vec<ChunkChoice>,
420    usage: Option<ChunkUsage>,
421}
422
423#[derive(Deserialize)]
424struct ChunkUsage {
425    prompt_tokens: Option<usize>,
426    completion_tokens: Option<usize>,
427    // Provider-specific cache fields (different providers use different names):
428    // OpenAI: prompt_tokens_details.cached_tokens
429    // DeepSeek/SiliconFlow: prompt_cache_hit_tokens
430    // Zhipu: cached_tokens
431    prompt_cache_hit_tokens: Option<usize>,
432    cached_tokens: Option<usize>,
433    prompt_tokens_details: Option<PromptTokensDetails>,
434}
435
436#[derive(Deserialize)]
437struct PromptTokensDetails {
438    cached_tokens: Option<usize>,
439}
440
441#[derive(Deserialize)]
442struct ChunkChoice {
443    delta: ChunkDelta,
444    finish_reason: Option<String>,
445}
446
447#[derive(Deserialize)]
448struct ChunkDelta {
449    content: Option<String>,
450    /// MiniMax M2.7 / DeepSeek R1 send thinking via this field. We forward
451    /// it as `StreamEvent::Reasoning` so `TurnRunner` can promote it to
452    /// the final text if `content` ends up empty — some gateways route
453    /// *entire* responses to `reasoning_content` for these models, which
454    /// previously showed up as a silent 0-token "Nailed it" turn.
455    reasoning_content: Option<String>,
456    tool_calls: Option<Vec<DeltaToolCall>>,
457}
458
459#[derive(Deserialize)]
460struct DeltaToolCall {
461    index: Option<usize>,
462    id: Option<String>,
463    function: Option<DeltaFunction>,
464}
465
466#[derive(Deserialize)]
467struct DeltaFunction {
468    name: Option<String>,
469    arguments: Option<String>,
470}
471
472#[derive(Deserialize)]
473struct ChatCompletionResponse {
474    #[serde(default)]
475    choices: Vec<ResponseChoice>,
476    usage: Option<ChunkUsage>,
477}
478
479#[derive(Deserialize)]
480struct ResponseChoice {
481    message: Option<ResponseMessage>,
482    finish_reason: Option<String>,
483}
484
485#[derive(Deserialize)]
486struct ResponseMessage {
487    content: Option<String>,
488    reasoning_content: Option<String>,
489}
490
491#[async_trait]
492impl LlmProvider for OpenAiProvider {
493    fn chat_stream(
494        &self,
495        messages: &[Message],
496        tools: Option<&[ToolDef]>,
497    ) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>>> {
498        let url = normalize_base_url(&self.base_url);
499        let mut body = json!({
500            "model": self.model,
501            "messages": Self::format_messages(messages, self.reasoning_history_policy(), self.supports_vision),
502            "stream": true,
503            "stream_options": { "include_usage": true },
504            "max_tokens": self.max_tokens,
505        });
506
507        if let Some(tool_defs) = tools {
508            if !tool_defs.is_empty() {
509                body["tools"] = json!(tool_defs
510                    .iter()
511                    .map(|td| json!({
512                        "type": "function",
513                        "function": {
514                            "name": td.name,
515                            "description": td.description,
516                            "parameters": td.parameters,
517                        }
518                    }))
519                    .collect::<Vec<_>>());
520                // Allow the model to decide whether to call multiple tools in parallel
521            }
522        }
523
524        // Kimi K2.5 / K2.6 top-level `thinking` object. Only sent when the
525        // user configured it — other OpenAI-compatible gateways may reject
526        // unknown fields, and omitting lets Kimi's default behavior apply.
527        if let Some(th) =
528            Self::thinking_body_value(self.thinking_type.as_deref(), self.thinking_keep.as_deref())
529        {
530            body["thinking"] = th;
531        }
532
533        let policy = crate::provider::retry::RetryPolicy::default_policy();
534
535        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
536
537        // ── TEMP WIRE-DUMP (debug only) ────────────────────────────────
538        // Set ATOMCODE_WIRE_DUMP=1 to dump every outbound LLM request body
539        // to ~/.atomcode/wire-dump/<timestamp>.json so we can verify what
540        // litellm / the proxy actually receives. Used to diagnose
541        // "tool_call results appear empty to the model" — comparing the
542        // wire body's `messages[N].content` against the conversation
543        // snapshot proves whether atomcode or the proxy is the source of
544        // truncation. Remove once root-caused.
545        if std::env::var("ATOMCODE_WIRE_DUMP").ok().as_deref() == Some("1") {
546            if let Ok(home) = std::env::var("HOME") {
547                let dir = std::path::PathBuf::from(home).join(".atomcode/wire-dump");
548                let _ = std::fs::create_dir_all(&dir);
549                let ts = std::time::SystemTime::now()
550                    .duration_since(std::time::UNIX_EPOCH)
551                    .map(|d| format!("{}.{:09}", d.as_secs(), d.subsec_nanos()))
552                    .unwrap_or_else(|_| "0".to_string());
553                let path = dir.join(format!("{}.json", ts));
554                if let Ok(serialized) = serde_json::to_string_pretty(&body) {
555                    let _ = std::fs::write(&path, serialized);
556                }
557            }
558        }
559        // ────────────────────────────────────────────────────────────────
560
561        // Provider truncation detector input: char count of message contents
562        // and tool_call arguments. We compare this to provider-reported
563        // prompt_tokens; if the ratio is way above any tokenizer can
564        // explain, the proxy is silently dropping content (e.g. GitCode
565        // litellm's hidden ~6.2K cap on glm-5 — 5/8 atomgr session).
566        let body_content_chars = sum_message_content_chars(&body);
567
568        // Move the pieces needed to rebuild the request into the task — the
569        // outer mid-stream retry loop reconstructs the builder on each
570        // attempt because `RequestBuilder` is single-use.
571        let client = self.client.clone();
572        let api_key = self.api_key.clone();
573        let provider_label = self.model.clone();
574        let base_url_for_signing = self.base_url.clone();
575
576        tokio::spawn(async move {
577            // Mid-stream retry: when the provider opens the stream but the
578            // chunked body errors out BEFORE any SSE `data:` line is parsed,
579            // it's safe to redo the whole request — no text/tool-call has
580            // been committed to the conversation, no UI delta has been
581            // emitted. Common cause: self-hosted endpoints that reset the
582            // connection at request open under load (the failure mode
583            // `error decoding response body` surfaces as). Once `data:` has
584            // been seen, retry would produce duplicated output, so the
585            // error is surfaced verbatim with a humanised explanation.
586            const MAX_STREAM_ATTEMPTS: u32 = 2;
587            let mut attempt: u32 = 0;
588            // AtomGit-gateway 401 reactive refresh: one shot per
589            // chat_stream call. The proactive `load_auth_token` path
590            // (provider/mod.rs) only refreshes when the local clock
591            // says the token is expired, which misses server-side
592            // revocation/rotation/clock-skew failures — those surface
593            // as a hard 401 mid-conversation. Without this flag a
594            // dead-loop is theoretically possible if the broker's
595            // refresh response still leaves us 401-able.
596            let mut auth_retry_used = false;
597            'retry: loop {
598                attempt += 1;
599                let body_bytes = match serde_json::to_vec(&body) {
600                    Ok(b) => b,
601                    Err(e) => {
602                        let _ = tx.send(Ok(StreamEvent::Error(format!(
603                            "Failed to serialize chat request body: {e}"
604                        ))));
605                        return;
606                    }
607                };
608                let extra_headers = match build_codingplan_headers(&base_url_for_signing, &body_bytes, None) {
609                    Ok(h) => h,
610                    Err(e) => {
611                        let _ = tx.send(Ok(StreamEvent::Error(format!("{e:#}"))));
612                        return;
613                    }
614                };
615                // Snapshot the current token. After a successful auth
616                // refresh below, the shared `api_key` will hold the new
617                // value and the next iteration picks it up here.
618                let current_token = api_key.read().await.clone();
619                let mut request = client
620                    .post(&url)
621                    .header("Authorization", format!("Bearer {}", current_token))
622                    .header("Content-Type", "application/json")
623                    .body(body_bytes);
624                for (name, value) in extra_headers {
625                    request = request.header(name, value);
626                }
627
628                let response = match crate::provider::retry::send_with_retry(request, &policy).await
629                {
630                    Ok(resp) => resp,
631                    Err(e) => {
632                        let _ = tx.send(Ok(StreamEvent::Error(format!(
633                            "Connection failed: {}",
634                            e
635                        ))));
636                        return;
637                    }
638                };
639
640                if !response.status().is_success() {
641                    let status = response.status();
642
643                    // AtomGit-gateway 401: try `refresh_access_token`
644                    // once. On success, write the new token back to the
645                    // shared slot and retry the SAME request (doesn't
646                    // count against MAX_STREAM_ATTEMPTS — that budget is
647                    // for transient body-decode failures, not auth).
648                    if status == reqwest::StatusCode::UNAUTHORIZED
649                        && !auth_retry_used
650                        && crypto::is_atomgit_gateway(&base_url_for_signing)
651                    {
652                        auth_retry_used = true;
653                        // Drain the response body so the connection can
654                        // be returned to the pool cleanly; otherwise
655                        // hyper logs a "connection reset" on drop.
656                        let _ = response.text().await;
657                        let api_key_handle = api_key.clone();
658                        let refresh_outcome =
659                            tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
660                                let auth = get_stored_auth().ok_or_else(|| {
661                                    anyhow::anyhow!("no stored auth — cannot refresh")
662                                })?;
663                                let new_auth = refresh_access_token(&auth)?;
664                                Ok(new_auth.access_token)
665                            })
666                            .await;
667                        if let Ok(Ok(new_token)) = refresh_outcome {
668                            *api_key_handle.write().await = new_token;
669                            // Auth retry doesn't burn a stream-attempt
670                            // slot — mid-stream retries are for
671                            // body-decode failures, which is orthogonal.
672                            attempt -= 1;
673                            continue 'retry;
674                        }
675                        // Fall through to the friendly-error branch
676                        // below; response is already consumed so build
677                        // the error from `status` alone.
678                        let _ = tx.send(Ok(StreamEvent::Error(
679                            t(Msg::ChatAuthExpired).to_string(),
680                        )));
681                        return;
682                    }
683
684                    let resp_url = response.url().to_string();
685                    let body = response.text().await.unwrap_or_default();
686                    // 401 from an AtomGit gateway after the auth-retry
687                    // shot was already used (or refresh-then-still-401):
688                    // the raw server message ("Gitcode auth: token
689                    // rejected (status=401)") is not actionable. Swap
690                    // for an i18n hint pointing at /login. Non-atomgit
691                    // gateways (user-supplied sk-... keys) keep the
692                    // verbatim message so developers see the real
693                    // diagnostic.
694                    let formatted = if status == reqwest::StatusCode::UNAUTHORIZED
695                        && crypto::is_atomgit_gateway(&base_url_for_signing)
696                    {
697                        t(Msg::ChatAuthExpired).to_string()
698                    } else {
699                        let msg = super::extract_error_message(&body);
700                        super::format_http_error(status, &resp_url, &msg)
701                    };
702                    let _ = tx.send(Ok(StreamEvent::Error(formatted)));
703                    return;
704                }
705
706                // Per-attempt local state. Reset on each retry so a partial
707                // first attempt's accumulated bytes don't leak into the
708                // second attempt's parser.
709                let mut byte_buffer: Vec<u8> = Vec::with_capacity(4096);
710                let mut buffer = String::new();
711                let mut byte_stream = response.bytes_stream();
712
713                // ── TEMP RESPONSE WIRE-DUMP (debug only) ──────────────
714                // Pairs with the request dump — captures the raw SSE
715                // bytes coming back so we can verify whether litellm /
716                // proxy returns a standard OpenAI stream format. Files
717                // are named with `_resp` suffix to pair with the
718                // request dump preceding them. Bytes are appended so
719                // multi-chunk streams accumulate into one file.
720                let resp_dump_path: Option<std::path::PathBuf> =
721                    if std::env::var("ATOMCODE_WIRE_DUMP").ok().as_deref() == Some("1") {
722                        std::env::var("HOME").ok().map(|home| {
723                            let dir = std::path::PathBuf::from(home).join(".atomcode/wire-dump");
724                            let _ = std::fs::create_dir_all(&dir);
725                            let ts = std::time::SystemTime::now()
726                                .duration_since(std::time::UNIX_EPOCH)
727                                .map(|d| format!("{}.{:09}", d.as_secs(), d.subsec_nanos()))
728                                .unwrap_or_else(|_| "0".to_string());
729                            dir.join(format!("{}_resp.sse", ts))
730                        })
731                    } else {
732                        None
733                    };
734                // ────────────────────────────────────────────────────────
735                let mut tool_calls: Vec<(String, String, String)> = Vec::new();
736                let mut last_usage: Option<crate::stream::TokenUsage> = None;
737                let mut saw_data_line = false;
738                let mut saw_valid_chunk = false;
739                let mut invalid_chunk_samples: Vec<String> = Vec::new();
740                // Track how much real content the stream actually
741                // produced. Used by the abrupt-close branch below to
742                // distinguish:
743                //   * many chunks + much content → real mid-output
744                //     truncation (table cut, list mid-row, …) → keep
745                //     emitting Done(truncated=true) so the agent's
746                //     "resume where you left off" retry can fire.
747                //   * 0-2 chunks, short text, no tool calls → gateway
748                //     streamed a single error blob like 「请求负载
749                //     过高,请稍后再试」 and hung up. NOT a real
750                //     truncation; emit StreamEvent::Error so the
751                //     agent's rate-limit / failure path takes over
752                //     instead of looping the resume retry.
753                let mut content_chunks: usize = 0;
754                let mut accumulated_content = String::new();
755                // One-shot guard: if the provider's prompt_tokens looks
756                // implausibly low for our content size, log a warning once
757                // per request stream so we don't spam.
758                let mut truncation_warned = false;
759                // Held-back Done event: GitCode-style gateways emit usage
760                // in a chunk AFTER finish_reason. We capture finish_reason
761                // here, keep parsing for the trailing usage chunk, and
762                // emit this on `[DONE]` (or stream end) so token counters
763                // and the truncation detector see real numbers.
764                let mut pending_finish: Option<crate::stream::StreamEvent> = None;
765
766            loop {
767                // 120s idle timeout: if no data arrives for 2 minutes, treat as dead connection.
768                let chunk = match tokio::time::timeout(
769                    std::time::Duration::from_secs(120),
770                    byte_stream.next(),
771                )
772                .await
773                {
774                    Ok(Some(chunk)) => chunk,
775                    Ok(None) => break, // stream ended
776                    Err(_) => {
777                        let _ = tx.send(Ok(StreamEvent::Error(
778                            "Stream timeout: no data received for 120 seconds".to_string(),
779                        )));
780                        return;
781                    }
782                };
783
784                match chunk {
785                    Ok(bytes) => {
786                        // TEMP wire-dump (response side): append raw
787                        // bytes as they arrive so we can inspect the
788                        // exact SSE stream litellm sent back.
789                        if let Some(ref p) = resp_dump_path {
790                            use std::io::Write;
791                            if let Ok(mut f) = std::fs::OpenOptions::new()
792                                .create(true)
793                                .append(true)
794                                .open(p)
795                            {
796                                let _ = f.write_all(&bytes);
797                            }
798                        }
799                        byte_buffer.extend_from_slice(&bytes);
800                    }
801                    Err(e) => {
802                        // Safe-to-retry condition: stream opened but no SSE
803                        // `data:` line was parsed yet. Common with
804                        // self-hosted endpoints that open the response,
805                        // immediately fail to start streaming, and reset
806                        // the chunked body — at this point nothing has
807                        // been committed downstream, so a fresh request
808                        // is equivalent to a first attempt.
809                        if !saw_data_line && attempt < MAX_STREAM_ATTEMPTS {
810                            continue 'retry;
811                        }
812                        let _ = tx.send(Ok(StreamEvent::Error(humanise_stream_error(&e))));
813                        return;
814                    }
815                }
816
817                // Convert bytes to string, keeping incomplete UTF-8 sequences for next chunk
818                let text = match String::from_utf8(byte_buffer.clone()) {
819                    Ok(s) => {
820                        byte_buffer.clear();
821                        s
822                    }
823                    Err(e) => {
824                        let valid_len = e.utf8_error().valid_up_to();
825                        if valid_len == 0 {
826                            // No valid UTF-8 yet, wait for more bytes
827                            continue;
828                        }
829                        let valid = String::from_utf8_lossy(&byte_buffer[..valid_len]).to_string();
830                        byte_buffer = byte_buffer[valid_len..].to_vec();
831                        valid
832                    }
833                };
834
835                buffer.push_str(&text);
836
837                while let Some(pos) = buffer.find('\n') {
838                    let line = buffer[..pos].trim().to_string();
839                    buffer = buffer[pos + 1..].to_string();
840
841                    if line.starts_with("data:") {
842                        saw_data_line = true;
843                        let data = line.strip_prefix("data:").unwrap().trim();
844                        if data == "[DONE]" {
845                            if let Some(usage) = last_usage.take() {
846                                let _ = tx.send(Ok(StreamEvent::Usage(usage)));
847                            }
848                            // Emit the held-back Done from finish_reason if present;
849                            // otherwise default to a non-truncated Done (e.g. providers
850                            // that close the stream with [DONE] but never emit a
851                            // finish_reason field).
852                            let done = pending_finish
853                                .take()
854                                .unwrap_or(StreamEvent::Done { truncated: false });
855                            let _ = tx.send(Ok(done));
856                            return;
857                        }
858                        if let Ok(chunk) = serde_json::from_str::<ChatChunk>(data) {
859                            saw_valid_chunk = true;
860                            // Store usage — don't emit yet. Some providers send cumulative
861                            // usage in multiple chunks; we only want the final value.
862                            if let Some(usage) = &chunk.usage {
863                                // Extract cached tokens from whichever field the provider uses
864                                let cached = usage
865                                    .prompt_cache_hit_tokens
866                                    .or(usage.cached_tokens)
867                                    .or_else(|| {
868                                        usage
869                                            .prompt_tokens_details
870                                            .as_ref()
871                                            .and_then(|d| d.cached_tokens)
872                                    })
873                                    .unwrap_or(0);
874                                let pt = usage.prompt_tokens.unwrap_or(0);
875                                if !truncation_warned {
876                                    if let Some(ratio) =
877                                        check_truncation(body_content_chars, pt)
878                                    {
879                                        truncation_warned = true;
880                                        let msg = format!(
881                                            "Provider may be truncating input on \
882                                             model={}: {} content chars vs {} reported \
883                                             prompt_tokens (ratio {:.1} chars/token; \
884                                             normal mixed-content runs 2-4). If turns \
885                                             spiral, the proxy may be capping context.",
886                                            provider_label,
887                                            body_content_chars,
888                                            pt,
889                                            ratio,
890                                        );
891                                        let _ = tx.send(Ok(StreamEvent::Warning(msg)));
892                                    }
893                                }
894                                last_usage = Some(crate::stream::TokenUsage {
895                                    prompt_tokens: pt,
896                                    completion_tokens: usage.completion_tokens.unwrap_or(0),
897                                    cached_tokens: cached,
898                                });
899                            }
900                            for choice in chunk.choices {
901                                if let Some(content) = choice.delta.content {
902                                    if !content.is_empty() {
903                                        content_chunks += 1;
904                                        accumulated_content.push_str(&content);
905                                        let _ = tx.send(Ok(StreamEvent::Delta(content)));
906                                    }
907                                }
908                                if let Some(reasoning) = choice.delta.reasoning_content {
909                                    if !reasoning.is_empty() {
910                                        let _ = tx.send(Ok(StreamEvent::Reasoning(reasoning)));
911                                    }
912                                }
913                                if let Some(delta_tcs) = &choice.delta.tool_calls {
914                                    for tc in delta_tcs {
915                                        let idx = tc.index.unwrap_or(0);
916                                        // Grow the vec if this is a new tool call index
917                                        while tool_calls.len() <= idx {
918                                            tool_calls.push((
919                                                String::new(),
920                                                String::new(),
921                                                String::new(),
922                                            ));
923                                        }
924                                        let entry = &mut tool_calls[idx];
925                                        if let Some(id) = &tc.id {
926                                            // Some providers (e.g., ModelScope) send empty string id
927                                            // in incremental tool call chunks. Only emit ToolCallStart
928                                            // for non-empty ids.
929                                            if !id.is_empty() {
930                                                entry.0 = id.clone();
931                                                if let Some(func) = &tc.function {
932                                                    entry.1 = func.name.clone().unwrap_or_default();
933                                                }
934                                                let _ = tx.send(Ok(StreamEvent::ToolCallStart {
935                                                    id: entry.0.clone(),
936                                                    name: entry.1.clone(),
937                                                }));
938                                            }
939                                        }
940                                        if let Some(func) = &tc.function {
941                                            if let Some(args) = &func.arguments {
942                                                entry.2.push_str(args);
943                                                let _ = tx.send(Ok(StreamEvent::ToolCallDelta(
944                                                    args.clone(),
945                                                )));
946                                            }
947                                        }
948                                    }
949                                }
950                                if let Some(ref reason) = choice.finish_reason {
951                                    // Don't return here — flush tool_calls + remember the
952                                    // finish_reason, then keep parsing until [DONE]. Some
953                                    // gateways (GitCode litellm proxy on glm-5 confirmed
954                                    // 5/8) send `usage` in a chunk AFTER `finish_reason`,
955                                    // and a previous version of this code returned on
956                                    // finish_reason → usage chunk silently dropped → both
957                                    // the token counters and the truncation detector saw
958                                    // 0 prompt_tokens for entire sessions.
959                                    match reason.as_str() {
960                                        "tool_calls" => {
961                                            for (id, name, args) in &tool_calls {
962                                                let _ = tx.send(Ok(StreamEvent::ToolCallDone(
963                                                    crate::tool::ToolCall {
964                                                        id: id.clone(),
965                                                        name: name.clone(),
966                                                        arguments: args.clone(),
967                                                    },
968                                                )));
969                                            }
970                                            tool_calls.clear();
971                                            pending_finish =
972                                                Some(StreamEvent::Done { truncated: false });
973                                        }
974                                        "length" | "max_tokens" => {
975                                            // Model hit token limit — flush partial tool
976                                            // calls so downstream sees what the model was
977                                            // attempting. (Args may be malformed;
978                                            // `repair_tool_args` + write.rs friendly errors
979                                            // handle that.)
980                                            for (id, name, args) in &tool_calls {
981                                                let _ = tx.send(Ok(StreamEvent::ToolCallDone(
982                                                    crate::tool::ToolCall {
983                                                        id: id.clone(),
984                                                        name: name.clone(),
985                                                        arguments: args.clone(),
986                                                    },
987                                                )));
988                                            }
989                                            tool_calls.clear();
990                                            pending_finish =
991                                                Some(StreamEvent::Done { truncated: true });
992                                        }
993                                        "stop" | _ => {
994                                            pending_finish =
995                                                Some(StreamEvent::Done { truncated: false });
996                                        }
997                                    }
998                                }
999                            }
1000                        } else if invalid_chunk_samples.len() < 3 && !data.is_empty() {
1001                            invalid_chunk_samples.push(sample_for_error(data));
1002                        }
1003                    }
1004                }
1005            }
1006
1007            let tail = buffer.trim();
1008            if !tail.is_empty() {
1009                if let Some(events) = parse_nonstream_response(tail) {
1010                    for event in events {
1011                        let _ = tx.send(Ok(event));
1012                    }
1013                    return;
1014                }
1015            }
1016
1017            if saw_data_line && !saw_valid_chunk {
1018                let detail = if invalid_chunk_samples.is_empty() {
1019                    "no chunk could be parsed".to_string()
1020                } else {
1021                    format!("samples: {}", invalid_chunk_samples.join(" | "))
1022                };
1023                let _ = tx.send(Ok(StreamEvent::Error(format!(
1024                    "Provider returned an unparseable OpenAI-compatible stream ({})",
1025                    detail
1026                ))));
1027                return;
1028            }
1029
1030            if !tail.is_empty() {
1031                let _ = tx.send(Ok(StreamEvent::Error(format!(
1032                    "Provider returned a non-SSE response AtomCode could not parse: {}",
1033                    sample_for_error(tail)
1034                ))));
1035                return;
1036            }
1037
1038                // ── Stream ended without close marker ──
1039                // Reaching here means we parsed valid SSE chunks but the
1040                // stream's `bytes_stream.next()` returned `Ok(None)` (clean
1041                // close at TCP/HTTP level) WITHOUT either:
1042                //   a. a `data: [DONE]` line (handled at line ~519, returns
1043                //      with truncated=false), or
1044                //   b. a `finish_reason` of `stop` / `length` / `tool_calls`
1045                //      (handled inline in the chunk parser around line ~610,
1046                //      returns with the appropriate truncated flag).
1047                //
1048                // Observed three times across May 2026 atomgr/atomcode
1049                // sessions on the self-hosted glm-5.1 endpoint:
1050                //   - 5/4 21:21 Turn 23 — `error decoding response body` (Err
1051                //     path, separately fixed by mid-stream retry).
1052                //   - 5/5 10:06 Turn 10 — text response stopped at "1.\n"
1053                //     mid-list, no close marker (this path).
1054                //   - 5/5 19:37 Turn 72-73 — markdown table truncated
1055                //     mid-row, no close marker (this path).
1056                //
1057                // Pre-fix this branch emitted `Done { truncated: false }`,
1058                // making the agent loop treat the partial output as a
1059                // complete response and `finish_turn(Natural)` immediately.
1060                // The user saw a cut-off table / list with no error, no
1061                // retry, and no indication that anything went wrong.
1062                //
1063                // Post-fix (this commit):
1064                //   1. Flush any in-flight tool calls so partial-args don't
1065                //      silently disappear (mirrors the `length` branch's
1066                //      handling at line ~622).
1067                //   2. Emit a TextDelta marker so the user (and datalog) can
1068                //      see why the response was cut. Goes through the
1069                //      normal stream_filter path; doesn't pollute model
1070                //      context with control sequences.
1071                //   3. Emit `Done { truncated: true }` so the agent loop's
1072                //      existing retry-with-resume path (`agent/mod.rs:1854`,
1073                //      `if truncated && retry_count < 1`) injects the
1074                //      "Output limit hit. … resume where you left off"
1075                //      hint and triggers a continuation turn.
1076                // If finish_reason had already arrived (we held the Done
1077                // back waiting for trailing usage), don't downgrade it to
1078                // a truncated=true close — the model finished cleanly and
1079                // the stream just lacked a [DONE] marker. Flush any
1080                // buffered usage first so token counters are honest.
1081                if let Some(usage) = last_usage.take() {
1082                    let _ = tx.send(Ok(StreamEvent::Usage(usage)));
1083                }
1084                if let Some(done) = pending_finish.take() {
1085                    let _ = tx.send(Ok(done));
1086                    return;
1087                }
1088
1089                // Abrupt close discriminator: if the model never made
1090                // tool-call progress AND the body arrived as a single
1091                // burst (≤ 2 content chunks), this wasn't a real
1092                // truncation — gateways like GitCode's litellm proxy
1093                // stream a single error blob (「请求负载过高,请稍后
1094                // 再试」 / a verbose `litellm.InternalServerError` JSON
1095                // envelope) and slam the connection closed without a
1096                // [DONE] marker. Promoting that to `truncated=true`
1097                // makes the agent inject "resume where you left off"
1098                // and retry, which renders the SAME error a second
1099                // time (see issue: GLM-5.1 网关限流双重渲染 /
1100                // LiteLLM 429 cooldown_list 双重渲染).
1101                // Diverting to `StreamEvent::Error` instead lets the
1102                // agent's `is_rate_limited` retry path (with 3-30s
1103                // backoff) handle it correctly — or, if it's an
1104                // unfamiliar error string, surface it once and stop.
1105                //
1106                // Discriminator is `content_chunks <= 2` alone: real
1107                // streamed completions emit many small deltas (tens
1108                // to hundreds of chunks), while gateway errors arrive
1109                // as 1-2 large chunks regardless of payload size
1110                // (Chinese 10-80-char banners or 700+-char LiteLLM
1111                // JSON envelopes both qualify). The earlier ≤ 200
1112                // char cap let the LiteLLM JSON shape slip through
1113                // to the truncated-retry path and caused the double
1114                // render. The real risk — misclassifying a 1-chunk
1115                // legit reply — is mitigated by the fact that
1116                // successful completions virtually always emit
1117                // `[DONE]`; reaching this branch already means the
1118                // stream ended anomalously.
1119                let trimmed = accumulated_content.trim();
1120                let looks_like_gateway_error =
1121                    tool_calls.is_empty() && content_chunks <= 2 && !trimmed.is_empty();
1122                if looks_like_gateway_error {
1123                    let _ = tx.send(Ok(StreamEvent::Error(trimmed.to_string())));
1124                    return;
1125                }
1126
1127                for (id, name, args) in &tool_calls {
1128                    let _ = tx.send(Ok(StreamEvent::ToolCallDone(
1129                        crate::tool::ToolCall {
1130                            id: id.clone(),
1131                            name: name.clone(),
1132                            arguments: args.clone(),
1133                        },
1134                    )));
1135                }
1136                tool_calls.clear();
1137                let _ = tx.send(Ok(StreamEvent::Delta(
1138                    "\n[stream ended without close marker — response above may be incomplete]\n"
1139                        .to_string(),
1140                )));
1141                let _ = tx.send(Ok(StreamEvent::Done { truncated: true }));
1142                return;
1143            }
1144        });
1145
1146        Ok(Box::pin(
1147            tokio_stream::wrappers::UnboundedReceiverStream::new(rx),
1148        ))
1149    }
1150
1151    fn model_name(&self) -> &str {
1152        &self.model
1153    }
1154
1155    fn reasoning_history_policy(&self) -> ReasoningPolicy {
1156        // Explicit user override wins over the name/url heuristic so a new
1157        // provider quirk can be worked around via config.toml without a
1158        // code change.
1159        if let Some(p) = self.reasoning_history_override {
1160            return p;
1161        }
1162        Self::derive_reasoning_policy(&self.model, &self.base_url)
1163    }
1164}
1165
1166/// Repair common JSON issues in tool call arguments from weak models.
1167fn repair_tool_args(s: &str) -> String {
1168    let mut r = s.trim().to_string();
1169
1170    // Remove markdown code fences
1171    if r.starts_with("```") {
1172        r = r.lines().skip(1).collect::<Vec<_>>().join("\n");
1173    }
1174    if r.ends_with("```") {
1175        r = r.strip_suffix("```").unwrap_or(&r).trim().to_string();
1176    }
1177
1178    // Remove trailing commas before } or ]
1179    loop {
1180        let before = r.clone();
1181        r = r.replace(",}", "}").replace(",]", "]");
1182        if r == before {
1183            break;
1184        }
1185    }
1186
1187    // Ensure wrapped in braces
1188    if !r.starts_with('{') && !r.starts_with('[') {
1189        r = format!("{{{}}}", r);
1190    }
1191
1192    // Balance braces
1193    let open = r.chars().filter(|c| *c == '{').count();
1194    let close = r.chars().filter(|c| *c == '}').count();
1195    for _ in 0..open.saturating_sub(close) {
1196        r.push('}');
1197    }
1198
1199    r
1200}
1201
1202/// Normalize a user-provided base_url to always end with `/chat/completions`.
1203/// Handles common mistakes:
1204///   - Trailing slash: "https://api.example.com/v1/" → "https://api.example.com/v1/chat/completions"
1205///   - Already has endpoint: "https://api.example.com/v1/chat/completions" → kept as-is
1206///   - Missing /v1: "https://api.example.com" → "https://api.example.com/chat/completions"
1207fn normalize_base_url(base: &str) -> String {
1208    let base = base.trim_end_matches('/');
1209    if base.ends_with("/chat/completions") {
1210        base.to_string()
1211    } else {
1212        format!("{}/chat/completions", base)
1213    }
1214}
1215
1216fn parse_nonstream_response(body: &str) -> Option<Vec<StreamEvent>> {
1217    let response: ChatCompletionResponse = serde_json::from_str(body).ok()?;
1218    let mut events = Vec::new();
1219
1220    if let Some(usage) = response.usage {
1221        let cached = usage
1222            .prompt_cache_hit_tokens
1223            .or(usage.cached_tokens)
1224            .or_else(|| {
1225                usage
1226                    .prompt_tokens_details
1227                    .as_ref()
1228                    .and_then(|d| d.cached_tokens)
1229            })
1230            .unwrap_or(0);
1231        events.push(StreamEvent::Usage(crate::stream::TokenUsage {
1232            prompt_tokens: usage.prompt_tokens.unwrap_or(0),
1233            completion_tokens: usage.completion_tokens.unwrap_or(0),
1234            cached_tokens: cached,
1235        }));
1236    }
1237
1238    for choice in response.choices {
1239        if let Some(message) = choice.message {
1240            if let Some(content) = message.content {
1241                if !content.is_empty() {
1242                    events.push(StreamEvent::Delta(content));
1243                }
1244            }
1245            if let Some(reasoning) = message.reasoning_content {
1246                if !reasoning.is_empty() {
1247                    events.push(StreamEvent::Reasoning(reasoning));
1248                }
1249            }
1250        }
1251
1252        let truncated = matches!(
1253            choice.finish_reason.as_deref(),
1254            Some("length") | Some("max_tokens")
1255        );
1256        events.push(StreamEvent::Done { truncated });
1257    }
1258
1259    if events.is_empty() {
1260        None
1261    } else {
1262        Some(events)
1263    }
1264}
1265
1266fn sample_for_error(s: &str) -> String {
1267    let compact = s.replace('\n', "\\n");
1268    let mut sample: String = compact.chars().take(160).collect();
1269    if compact.chars().count() > 160 {
1270        sample.push_str("...");
1271    }
1272    sample
1273}
1274
1275/// Translate a `reqwest::Error` from the streaming body into something a
1276/// non-engineer user can act on. The bare `Display` for these errors is
1277/// shaped for HTTP-protocol context ("error decoding response body",
1278/// "operation timed out") and lands in the chat as gibberish — users
1279/// can't tell whether to retry, switch providers, or wait. Three buckets:
1280///
1281/// 1. `is_decode()` — the most common self-hosted-endpoint failure: the
1282///    server cut the chunked body mid-flight (worker timeout, OOM,
1283///    upstream proxy reset). Recoverable by resending; tell the user so.
1284/// 2. `is_timeout()` — request-level timeout. Same recovery signal.
1285/// 3. `is_connect()` — TCP connect failed late (rare mid-stream, but
1286///    possible on connection-pool churn). Recoverable.
1287/// Everything else falls through to the bare error text.
1288pub(crate) fn humanise_stream_error(e: &reqwest::Error) -> String {
1289    if e.is_decode() {
1290        format!(
1291            "Endpoint terminated the response stream mid-flight ({}). \
1292             The provider may have hit a worker timeout or upstream-proxy \
1293             read limit on a long generation. Try resending the message; \
1294             if it recurs, increase the endpoint's read/write timeouts \
1295             or split the request into smaller chunks.",
1296            e
1297        )
1298    } else if e.is_timeout() {
1299        format!(
1300            "Stream timeout ({}). The provider didn't deliver chunks \
1301             within the configured window. Try resending or check provider \
1302             status.",
1303            e
1304        )
1305    } else if e.is_connect() {
1306        format!(
1307            "Connection lost mid-stream ({}). Try resending; check \
1308             network reachability if it persists.",
1309            e
1310        )
1311    } else {
1312        format!("Stream error: {}", e)
1313    }
1314}
1315
1316/// Sum of every message's `content` length plus every tool_call's
1317/// `arguments` length. Used as the denominator for the
1318/// chars/prompt_tokens ratio that flags a silently-truncating proxy.
1319/// We deliberately ignore JSON keys/braces — those are constant overhead
1320/// across all bodies and would dilute the signal.
1321fn sum_message_content_chars(body: &serde_json::Value) -> usize {
1322    let mut total = 0usize;
1323    let Some(msgs) = body.get("messages").and_then(|m| m.as_array()) else {
1324        return 0;
1325    };
1326    for m in msgs {
1327        if let Some(s) = m.get("content").and_then(|c| c.as_str()) {
1328            total = total.saturating_add(s.len());
1329        } else if let Some(arr) = m.get("content").and_then(|c| c.as_array()) {
1330            // Vision multipart content: sum text fragments only (image
1331            // payloads are URL-or-base64 strings the model doesn't read
1332            // as text tokens, so counting them inflates the ratio).
1333            for part in arr {
1334                if let Some(s) = part.get("text").and_then(|t| t.as_str()) {
1335                    total = total.saturating_add(s.len());
1336                }
1337            }
1338        }
1339        if let Some(tcs) = m.get("tool_calls").and_then(|t| t.as_array()) {
1340            for tc in tcs {
1341                if let Some(args) = tc
1342                    .get("function")
1343                    .and_then(|f| f.get("arguments"))
1344                    .and_then(|a| a.as_str())
1345                {
1346                    total = total.saturating_add(args.len());
1347                }
1348            }
1349        }
1350    }
1351    total
1352}
1353
1354/// Returns `Some(ratio)` if the chars-per-token ratio is high enough to
1355/// suggest the provider silently truncated the input.
1356///
1357/// Normal tokenizers across mixed CJK/English/code run 2-4 chars/token.
1358/// The threshold is 6.0: any tokenizer producing 6+ chars/token would
1359/// be doing something unprecedented; the realistic explanation is that
1360/// the proxy capped the input and reported tokens for the truncated
1361/// view. Returns None when there's nothing to compare against.
1362fn check_truncation(content_chars: usize, prompt_tokens: usize) -> Option<f64> {
1363    // Skip tiny requests (system-only ping, etc.) — ratio noise.
1364    if content_chars < 4_000 || prompt_tokens == 0 {
1365        return None;
1366    }
1367    let ratio = content_chars as f64 / prompt_tokens as f64;
1368    if ratio > 6.0 {
1369        Some(ratio)
1370    } else {
1371        None
1372    }
1373}
1374
1375#[cfg(test)]
1376mod tests {
1377    use super::{
1378        check_truncation, parse_nonstream_response, sample_for_error, sum_message_content_chars,
1379        OpenAiProvider, ReasoningPolicy,
1380    };
1381    use crate::conversation::message::{ImagePart, Message, MessageContent, Role};
1382    use crate::stream::StreamEvent;
1383
1384    /// Wire shape for `MessageContent::MultiPart`: must match OpenAI's
1385    /// vision schema exactly — `role: user`, `content: [...]` array,
1386    /// each block tagged with `type` ("image_url" or "text"). Order
1387    /// is image(s) first, text second. The PR added the multipart code
1388    /// path but no test for the wire output; without this regression
1389    /// guard a future field-rename or order-flip would silently break
1390    /// every vision-capable provider.
1391    #[test]
1392    fn multipart_serialises_to_openai_vision_schema() {
1393        let msg = Message {
1394            role: Role::User,
1395            content: MessageContent::MultiPart {
1396                text: Some("describe this".to_string()),
1397                images: vec![ImagePart {
1398                    media_type: "image/png".to_string(),
1399                    data: "AAAA".to_string(),
1400                }],
1401            },
1402        };
1403        let out = OpenAiProvider::format_messages(&[msg], ReasoningPolicy::Exclude, true);
1404        assert_eq!(out.len(), 1, "one message in, one out");
1405        let m = &out[0];
1406        assert_eq!(m["role"], "user");
1407        let content = m["content"].as_array().expect("content must be an array");
1408        assert_eq!(content.len(), 2, "image + text = 2 blocks");
1409        // Block 0: image, must have exactly `type` and `image_url`.
1410        assert_eq!(content[0]["type"], "image_url");
1411        assert_eq!(
1412            content[0]["image_url"]["url"],
1413            "data:image/png;base64,AAAA"
1414        );
1415        assert!(content[0].get("text").is_none(), "image block must not have text field");
1416        // Block 1: text, must use `type: text` + `text: <string>`.
1417        assert_eq!(content[1]["type"], "text");
1418        assert_eq!(content[1]["text"], "describe this");
1419    }
1420
1421    /// Multi-image variant: all images come before the text block, in
1422    /// the order they were attached.
1423    #[test]
1424    fn multipart_preserves_image_order_then_text() {
1425        let msg = Message {
1426            role: Role::User,
1427            content: MessageContent::MultiPart {
1428                text: Some("compare".to_string()),
1429                images: vec![
1430                    ImagePart { media_type: "image/png".into(), data: "FIRST".into() },
1431                    ImagePart { media_type: "image/jpeg".into(), data: "SECOND".into() },
1432                ],
1433            },
1434        };
1435        let out = OpenAiProvider::format_messages(&[msg], ReasoningPolicy::Exclude, true);
1436        let content = out[0]["content"].as_array().unwrap();
1437        assert_eq!(content.len(), 3);
1438        assert_eq!(content[0]["image_url"]["url"], "data:image/png;base64,FIRST");
1439        assert_eq!(content[1]["image_url"]["url"], "data:image/jpeg;base64,SECOND");
1440        assert_eq!(content[2]["type"], "text");
1441        assert_eq!(content[2]["text"], "compare");
1442    }
1443
1444    /// Image-only multipart (no caption): content array contains just
1445    /// the image block, no empty trailing text block.
1446    #[test]
1447    fn multipart_without_text_omits_text_block() {
1448        let msg = Message {
1449            role: Role::User,
1450            content: MessageContent::MultiPart {
1451                text: None,
1452                images: vec![ImagePart { media_type: "image/png".into(), data: "X".into() }],
1453            },
1454        };
1455        let out = OpenAiProvider::format_messages(&[msg], ReasoningPolicy::Exclude, true);
1456        let content = out[0]["content"].as_array().unwrap();
1457        assert_eq!(content.len(), 1, "single image block, no text block");
1458        assert_eq!(content[0]["type"], "image_url");
1459    }
1460
1461    /// Regression: the user pasted an image with a vision-capable model
1462    /// (Claude/Opus), got a reply, then ran `/model` to switch to GLM-5.1
1463    /// (text-only) and tried to send a follow-up. The conversation still
1464    /// carried the historical `MultiPart` user turn; serialising it
1465    /// against GLM-5.1's text-only schema sent `content: [...]` to the
1466    /// upstream which rejected with `ModelArts.81001 message[N].content[0]
1467    /// has invalid field(s): text, type`. The provider must gracefully
1468    /// degrade `MultiPart` → text-only string when `supports_vision = false`,
1469    /// preserving the user's caption (with our `[Image #N]` marker still
1470    /// inside) but stripping the image bytes the wire schema can't
1471    /// represent.
1472    #[test]
1473    fn multipart_degrades_to_text_when_target_is_text_only() {
1474        let history = Message {
1475            role: Role::User,
1476            content: MessageContent::MultiPart {
1477                text: Some("[Image #1] 这是什么图啊".into()),
1478                images: vec![ImagePart { media_type: "image/png".into(), data: "AAAA".into() }],
1479            },
1480        };
1481        let out = OpenAiProvider::format_messages(&[history], ReasoningPolicy::Exclude, false);
1482        assert_eq!(out.len(), 1);
1483        let m = &out[0];
1484        assert_eq!(m["role"], "user");
1485        // Content must be a flat string, NOT an array — anything else is
1486        // a 400 against text-only proxies (ModelArts, ZhipuAI, etc.).
1487        assert!(
1488            m["content"].is_string(),
1489            "text-only target must receive content as a string, got: {}",
1490            m["content"]
1491        );
1492        let content = m["content"].as_str().unwrap();
1493        assert!(
1494            content.contains("这是什么图啊"),
1495            "user's caption must survive degradation: {:?}",
1496            content
1497        );
1498        // No image_url block leakage.
1499        assert!(
1500            !content.contains("data:image"),
1501            "image bytes must not appear in degraded payload: {:?}",
1502            content
1503        );
1504    }
1505
1506    /// When `MultiPart` had no text at all (image-only paste, no caption)
1507    /// and the target is text-only, the degraded payload must still be
1508    /// non-empty — empty user content is rejected by some proxies (e.g.
1509    /// "messages must contain a non-empty content"). Use a placeholder
1510    /// so the conversation flow stays valid.
1511    #[test]
1512    fn multipart_text_only_target_uses_placeholder_when_caption_empty() {
1513        let history = Message {
1514            role: Role::User,
1515            content: MessageContent::MultiPart {
1516                text: None,
1517                images: vec![ImagePart { media_type: "image/png".into(), data: "X".into() }],
1518            },
1519        };
1520        let out = OpenAiProvider::format_messages(&[history], ReasoningPolicy::Exclude, false);
1521        let content = out[0]["content"].as_str().expect("string content");
1522        assert!(!content.is_empty(), "must be non-empty placeholder");
1523    }
1524
1525    #[test]
1526    fn parses_nonstream_text_response() {
1527        let body = r#"{
1528          "choices": [
1529            {
1530              "message": { "content": "hello" },
1531              "finish_reason": "stop"
1532            }
1533          ],
1534          "usage": { "prompt_tokens": 11, "completion_tokens": 3 }
1535        }"#;
1536
1537        let events = parse_nonstream_response(body).expect("should parse non-stream response");
1538        assert!(matches!(events[0], StreamEvent::Usage(_)));
1539        assert!(matches!(events[1], StreamEvent::Delta(ref s) if s == "hello"));
1540        assert!(matches!(events[2], StreamEvent::Done { truncated: false }));
1541    }
1542
1543    #[test]
1544    fn parses_nonstream_reasoning_only_response() {
1545        let body = r#"{
1546          "choices": [
1547            {
1548              "message": { "reasoning_content": "thinking" },
1549              "finish_reason": "length"
1550            }
1551          ]
1552        }"#;
1553
1554        let events = parse_nonstream_response(body).expect("should parse non-stream response");
1555        assert!(matches!(events[0], StreamEvent::Reasoning(ref s) if s == "thinking"));
1556        assert!(matches!(events[1], StreamEvent::Done { truncated: true }));
1557    }
1558
1559    #[test]
1560    fn sample_for_error_flattens_newlines() {
1561        assert_eq!(sample_for_error("a\nb"), "a\\nb");
1562    }
1563
1564    // ── ReasoningPolicy: model / base_url routing ──
1565
1566    #[test]
1567    fn reasoning_policy_moonshot_kimi_routes_to_include() {
1568        use super::{OpenAiProvider, ReasoningPolicy};
1569        assert_eq!(
1570            OpenAiProvider::derive_reasoning_policy(
1571                "kimi-k2-thinking",
1572                "https://api.moonshot.cn/v1"
1573            ),
1574            ReasoningPolicy::Include,
1575        );
1576        assert_eq!(
1577            OpenAiProvider::derive_reasoning_policy("kimi-k2.6", "https://api.kimi.com/v1"),
1578            ReasoningPolicy::Include,
1579        );
1580    }
1581
1582    #[test]
1583    fn reasoning_policy_deepseek_reasoner_routes_to_exclude() {
1584        use super::{OpenAiProvider, ReasoningPolicy};
1585        // DeepSeek-R1 rejects the request if reasoning_content is echoed back.
1586        assert_eq!(
1587            OpenAiProvider::derive_reasoning_policy(
1588                "deepseek-reasoner",
1589                "https://api.deepseek.com/v1"
1590            ),
1591            ReasoningPolicy::Exclude,
1592        );
1593        assert_eq!(
1594            OpenAiProvider::derive_reasoning_policy("deepseek-r1", "https://api.deepseek.com/v1"),
1595            ReasoningPolicy::Exclude,
1596        );
1597    }
1598
1599    #[test]
1600    fn reasoning_policy_deepseek_v4_routes_to_include() {
1601        use super::{OpenAiProvider, ReasoningPolicy};
1602        // DeepSeek V4 thinking mode requires reasoning_content echoed back on
1603        // assistant tool_call messages — opposite of V3/R1.
1604        assert_eq!(
1605            OpenAiProvider::derive_reasoning_policy("deepseek-v4-pro", "https://api.deepseek.com"),
1606            ReasoningPolicy::Include,
1607        );
1608        assert_eq!(
1609            OpenAiProvider::derive_reasoning_policy("deepseek-v4", "https://api.deepseek.com"),
1610            ReasoningPolicy::Include,
1611        );
1612    }
1613
1614    #[test]
1615    fn reasoning_history_config_override_wins_over_heuristic() {
1616        // `reasoning_history = "exclude"` forces Exclude even on a model that
1617        // the heuristic would route to Include (deepseek-v4-pro).
1618        use super::OpenAiProvider;
1619        use crate::config::provider::ProviderConfig;
1620        use crate::provider::{LlmProvider, ReasoningPolicy};
1621        let cfg = ProviderConfig {
1622            provider_type: "openai".into(),
1623            api_key: Some("sk-test".into()),
1624            model: "deepseek-v4-pro".into(),
1625            base_url: Some("https://api.deepseek.com".into()),
1626            system_prompt: None,
1627            user_agent: None,
1628            context_window: 128_000,
1629            max_tokens: None,
1630            thinking_type: None,
1631            thinking_keep: None,
1632            reasoning_history: Some("exclude".into()),
1633            thinking_enabled: None,
1634            thinking_budget: None,
1635            skip_tls_verify: false,
1636            ephemeral: false,
1637
1638};
1639        let p = OpenAiProvider::new(&cfg).expect("provider builds");
1640        assert_eq!(p.reasoning_history_policy(), ReasoningPolicy::Exclude);
1641
1642        // And vice versa: "include" on a plain OpenAI model (heuristic = Exclude)
1643        // forces Include — lets users unblock new providers without a code change.
1644        let cfg_inc = ProviderConfig {
1645            model: "gpt-4o".into(),
1646            base_url: Some("https://api.openai.com/v1".into()),
1647            reasoning_history: Some("include".into()),
1648            ..cfg
1649        };
1650        let p2 = OpenAiProvider::new(&cfg_inc).expect("provider builds");
1651        assert_eq!(p2.reasoning_history_policy(), ReasoningPolicy::Include);
1652    }
1653
1654    #[test]
1655    fn reasoning_history_config_invalid_value_fails_fast() {
1656        // Typos in config should surface at load time with a clear error,
1657        // not a silent policy-mismatch 400 mid-turn.
1658        use super::OpenAiProvider;
1659        use crate::config::provider::ProviderConfig;
1660        let cfg = ProviderConfig {
1661            provider_type: "openai".into(),
1662            api_key: Some("sk-test".into()),
1663            model: "gpt-4o".into(),
1664            base_url: Some("https://api.openai.com/v1".into()),
1665            system_prompt: None,
1666            user_agent: None,
1667            context_window: 128_000,
1668            max_tokens: None,
1669            thinking_type: None,
1670            thinking_keep: None,
1671            reasoning_history: Some("always".into()),
1672            thinking_enabled: None,
1673            thinking_budget: None,
1674            skip_tls_verify: false,
1675            ephemeral: false,
1676
1677};
1678        let err = match OpenAiProvider::new(&cfg) {
1679            Err(e) => e,
1680            Ok(_) => panic!("bad reasoning_history value must reject"),
1681        };
1682        let msg = err.to_string();
1683        assert!(
1684            msg.contains("reasoning_history") && msg.contains("always"),
1685            "error must name the bad field and value, got: {msg}"
1686        );
1687    }
1688
1689    #[test]
1690    fn reasoning_policy_default_is_exclude() {
1691        use super::{OpenAiProvider, ReasoningPolicy};
1692        // Unknown OpenAI-compatible endpoint → safe default: don't emit.
1693        assert_eq!(
1694            OpenAiProvider::derive_reasoning_policy("gpt-4o", "https://api.openai.com/v1"),
1695            ReasoningPolicy::Exclude,
1696        );
1697        assert_eq!(
1698            OpenAiProvider::derive_reasoning_policy("some-custom-model", "https://example.com/v1"),
1699            ReasoningPolicy::Exclude,
1700        );
1701    }
1702
1703    // ── format_messages: reasoning_content emission per policy ──
1704
1705    fn atc_message(reasoning: Option<&str>) -> crate::conversation::message::Message {
1706        use crate::conversation::message::{Message, MessageContent, Role};
1707        use crate::tool::ToolCall;
1708        Message {
1709            role: Role::Assistant,
1710            content: MessageContent::AssistantWithToolCalls {
1711                text: Some("ok".into()),
1712                tool_calls: vec![ToolCall {
1713                    id: "c1".into(),
1714                    name: "bash".into(),
1715                    arguments: "{}".into(),
1716                }],
1717                reasoning_content: reasoning.map(|s| s.to_string()),
1718                thinking_blocks: Vec::new(),
1719            },
1720        }
1721    }
1722
1723    #[test]
1724    fn format_messages_include_with_some_reasoning_emits_field() {
1725        use super::{OpenAiProvider, ReasoningPolicy};
1726        let msgs = vec![atc_message(Some("thinking text"))];
1727        let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1728        assert_eq!(out.len(), 1);
1729        assert_eq!(out[0]["reasoning_content"], "thinking text");
1730    }
1731
1732    #[test]
1733    fn placeholder_send_side_matches_shared_constant() {
1734        // `TurnRunner::Done` skips reasoning→text promotion when the
1735        // accumulated reasoning_buf equals exactly this placeholder.
1736        // Send-side (format_messages, three call sites) MUST emit the
1737        // same byte string — otherwise a buggy gateway echoing it
1738        // back would slip past the guard and cause silent
1739        // "(no reasoning recorded) · Nailed it" stops. Pin the
1740        // contract by routing both sides through one constant.
1741        use super::{OpenAiProvider, ReasoningPolicy};
1742        use crate::provider::REASONING_PLACEHOLDER;
1743        let msgs = vec![atc_message(None)];
1744        let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1745        assert_eq!(
1746            out[0]["reasoning_content"].as_str().unwrap(),
1747            REASONING_PLACEHOLDER,
1748        );
1749    }
1750
1751    #[test]
1752    fn format_messages_include_with_none_reasoning_emits_placeholder() {
1753        // Kimi's check is "field missing" (empty ok). DeepSeek V4's check is
1754        // stricter — rejects an empty string on tool_call messages. When we
1755        // have no stored reasoning (cross-provider session, old jsonl before
1756        // capture was wired, non-thinking model that tool-called anyway), emit
1757        // a short non-empty placeholder so BOTH providers accept the message.
1758        use super::{OpenAiProvider, ReasoningPolicy};
1759        let msgs = vec![atc_message(None)];
1760        let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1761        let rc = out[0]["reasoning_content"].as_str().unwrap();
1762        assert!(
1763            !rc.is_empty(),
1764            "placeholder must be non-empty for DeepSeek V4"
1765        );
1766    }
1767
1768    #[test]
1769    fn format_messages_include_assistant_text_emits_reasoning_content() {
1770        // DeepSeek V4 tool-call round contract (per official docs): in every
1771        // subsequent request, ALL reasoning_content from the tool-call turn
1772        // must be echoed — including the reasoning for the FINAL TEXT answer
1773        // (思维链1.3 → 回答1 in the docs diagram). Our Text variant doesn't
1774        // persist per-turn reasoning, so under Include we emit a placeholder.
1775        // Regression for the "second prompt 400" bug.
1776        use super::{OpenAiProvider, ReasoningPolicy};
1777        use crate::conversation::message::{Message, MessageContent, Role};
1778        let msgs = vec![Message {
1779            role: Role::Assistant,
1780            content: MessageContent::Text("当前系统时间是 …".into()),
1781        }];
1782        let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1783        assert_eq!(out.len(), 1);
1784        let rc = out[0]["reasoning_content"].as_str();
1785        assert!(
1786            rc.map_or(false, |s| !s.is_empty()),
1787            "assistant Text under Include must carry a non-empty reasoning_content, got: {}",
1788            out[0]
1789        );
1790
1791        // Under Exclude (V3/default) the key must NOT appear on Text — sending
1792        // it would regress V3 R1 which rejects any reasoning_content echo.
1793        let out_ex = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Exclude, true);
1794        assert!(
1795            out_ex[0]
1796                .as_object()
1797                .unwrap()
1798                .get("reasoning_content")
1799                .is_none(),
1800            "Exclude must not add reasoning_content to assistant Text, got: {}",
1801            out_ex[0]
1802        );
1803    }
1804
1805    #[test]
1806    fn format_messages_include_with_empty_string_reasoning_emits_placeholder() {
1807        // Same reason as `_none_reasoning_emits_placeholder`: an empty-string
1808        // reasoning (either stored as "" or decayed from serde) must still be
1809        // replaced with the non-empty placeholder before sending.
1810        use super::{OpenAiProvider, ReasoningPolicy};
1811        let msgs = vec![atc_message(Some(""))];
1812        let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Include, true);
1813        let rc = out[0]["reasoning_content"].as_str().unwrap();
1814        assert!(
1815            !rc.is_empty(),
1816            "placeholder must replace empty-string reasoning"
1817        );
1818    }
1819
1820    #[test]
1821    fn format_messages_exclude_omits_reasoning_content_key() {
1822        // DeepSeek-R1 rejects the request if reasoning_content key is present,
1823        // so under Exclude we must NOT emit the key even when we have a value.
1824        use super::{OpenAiProvider, ReasoningPolicy};
1825        let msgs = vec![atc_message(Some("should be stripped"))];
1826        let out = OpenAiProvider::format_messages(&msgs, ReasoningPolicy::Exclude, true);
1827        assert!(
1828            out[0]
1829                .as_object()
1830                .unwrap()
1831                .get("reasoning_content")
1832                .is_none(),
1833            "reasoning_content key must be absent under Exclude, got: {}",
1834            out[0]
1835        );
1836    }
1837
1838    // ── thinking config → request body ──
1839
1840    #[test]
1841    fn thinking_body_none_when_both_unset() {
1842        use super::OpenAiProvider;
1843        // Unset = don't emit the key at all. Some OpenAI-compatible gateways
1844        // 400 on unknown top-level fields, so missing is safer than `{}`.
1845        assert!(OpenAiProvider::thinking_body_value(None, None).is_none());
1846    }
1847
1848    #[test]
1849    fn thinking_body_disabled_emits_type_only() {
1850        use super::OpenAiProvider;
1851        let out = OpenAiProvider::thinking_body_value(Some("disabled"), None).unwrap();
1852        assert_eq!(out, serde_json::json!({"type": "disabled"}));
1853    }
1854
1855    #[test]
1856    fn thinking_body_enabled_with_keep_all() {
1857        use super::OpenAiProvider;
1858        // K2.6 Preserved Thinking: the reference combination from Kimi docs.
1859        let out = OpenAiProvider::thinking_body_value(Some("enabled"), Some("all")).unwrap();
1860        assert_eq!(out, serde_json::json!({"type": "enabled", "keep": "all"}));
1861    }
1862
1863    #[test]
1864    fn thinking_fields_roundtrip_via_toml_provider_config() {
1865        // The TOML shape users will write in config.toml — flat, with a
1866        // `thinking_` prefix so each field's purpose is obvious on its own.
1867        use crate::config::provider::ProviderConfig;
1868        let toml = r#"
1869            type = "openai"
1870            model = "kimi-k2.6"
1871            base_url = "https://api.moonshot.cn/v1"
1872            api_key = "sk-x"
1873            thinking_type = "enabled"
1874            thinking_keep = "all"
1875        "#;
1876        let cfg: ProviderConfig = toml::from_str(toml).expect("TOML parse");
1877        assert_eq!(cfg.thinking_type.as_deref(), Some("enabled"));
1878        assert_eq!(cfg.thinking_keep.as_deref(), Some("all"));
1879    }
1880
1881    // ── serde backward compat for old session jsonl ──
1882
1883    #[test]
1884    fn old_jsonl_without_reasoning_content_still_deserializes() {
1885        // Session jsonl written before this field existed must still load.
1886        // `#[serde(default)]` on the field makes this work.
1887        use crate::conversation::message::MessageContent;
1888        let old = r#"{"AssistantWithToolCalls":{"text":"hi","tool_calls":[]}}"#;
1889        let parsed: MessageContent = serde_json::from_str(old)
1890            .expect("old-format AssistantWithToolCalls should deserialize");
1891        match parsed {
1892            MessageContent::AssistantWithToolCalls {
1893                text,
1894                reasoning_content,
1895                ..
1896            } => {
1897                assert_eq!(text.as_deref(), Some("hi"));
1898                assert!(reasoning_content.is_none());
1899            }
1900            other => panic!("unexpected variant: {:?}", other),
1901        }
1902    }
1903
1904    // ── provider truncation detector ──
1905
1906    #[test]
1907    fn truncation_detector_flags_gitcode_real_world_ratio() {
1908        // 5/8 atomgr session: GitCode reported 6233 prompt_tokens for a
1909        // body atomcode counted at ~78K content chars. Ratio 12.58.
1910        // This is the canary: if check_truncation ever stops firing on
1911        // this number, weak-model debugging gets harder by hours.
1912        let ratio = check_truncation(78_381, 6_233)
1913            .expect("12.58 chars/token must be flagged as truncation");
1914        assert!(ratio > 12.0 && ratio < 13.0, "ratio={}", ratio);
1915    }
1916
1917    #[test]
1918    fn truncation_detector_silent_on_normal_tokenizer() {
1919        // Siliconflow Pro/zai-org/GLM-5 same session: 127K chars / 45K
1920        // tokens = 2.81. Healthy upstream — must not warn.
1921        assert!(check_truncation(127_763, 45_518).is_none());
1922    }
1923
1924    #[test]
1925    fn truncation_detector_silent_on_english_heavy_4chars_per_token() {
1926        // Pure-English code-only request can hit ~4 chars/token. The
1927        // threshold (6.0) leaves headroom so non-truncated requests
1928        // never noise the log.
1929        assert!(check_truncation(40_000, 10_000).is_none());
1930    }
1931
1932    #[test]
1933    fn truncation_detector_skips_tiny_bodies() {
1934        // System-only ping or a bare "hi" — ratio noise dominates,
1935        // so the detector stays silent under 4K chars regardless of
1936        // the count.
1937        assert!(check_truncation(1_500, 100).is_none());
1938    }
1939
1940    #[test]
1941    fn truncation_detector_handles_zero_prompt_tokens() {
1942        // Some self-hosted gateways drop usage entirely. Don't divide
1943        // by zero, just stay silent.
1944        assert!(check_truncation(50_000, 0).is_none());
1945    }
1946
1947    #[test]
1948    fn sum_message_content_chars_sums_strings_and_tool_args() {
1949        let body = serde_json::json!({
1950            "model": "x",
1951            "messages": [
1952                {"role": "system", "content": "abc"},     // 3
1953                {"role": "user", "content": "hello"},      // 5
1954                {"role": "assistant", "content": "",
1955                 "tool_calls": [
1956                     {"function": {"name": "read_file",
1957                                   // JSON-decoded length = 12 chars
1958                                   "arguments": "{\"path\":\"a\"}"}},
1959                 ]},
1960                {"role": "tool", "content": "result"},     // 6
1961            ]
1962        });
1963        assert_eq!(sum_message_content_chars(&body), 3 + 5 + 12 + 6);
1964    }
1965
1966    #[test]
1967    fn sum_message_content_chars_ignores_image_urls_in_multipart() {
1968        // Vision payloads have URL/base64 strings that aren't real
1969        // text tokens — counting them would falsely inflate the
1970        // chars/token ratio for vision requests.
1971        let body = serde_json::json!({
1972            "messages": [{
1973                "role": "user",
1974                "content": [
1975                    {"type": "text", "text": "describe"},  // 8
1976                    {"type": "image_url",
1977                     "image_url": {"url": "data:image/png;base64,AAAAAAAAAA"}},
1978                ]
1979            }]
1980        });
1981        assert_eq!(sum_message_content_chars(&body), 8);
1982    }
1983
1984    #[test]
1985    fn sum_message_content_chars_safe_on_missing_messages() {
1986        let body = serde_json::json!({"model": "x"});
1987        assert_eq!(sum_message_content_chars(&body), 0);
1988    }
1989
1990    // ── abrupt-close gateway-error discriminator ───────────────────
1991    //
1992    // GLM-5.1 / litellm-style gateways respond to a 429 by streaming
1993    // a single SSE chunk carrying a Chinese error message and then
1994    // hanging up without `data: [DONE]`. Before this code path
1995    // existed, the provider mapped both that case AND "real
1996    // mid-output truncation" to `Done { truncated: true }`, causing
1997    // the agent's resume-from-truncation retry to re-fire the same
1998    // request and render the same error message twice. Tests below
1999    // pin the new behavior:
2000    //
2001    //   * 1 short content chunk, no `[DONE]`           → Error
2002    //   * many content chunks + abrupt close           → Done(truncated=true)
2003    //   * 1 short chunk + tool_call + abrupt close     → Done(truncated=true)
2004    //     (model was making tool progress; let resume retry try again)
2005
2006    use crate::config::provider::ProviderConfig;
2007    use crate::provider::LlmProvider;
2008    use futures::StreamExt;
2009    use wiremock::matchers::{method, path};
2010    use wiremock::{Mock, MockServer, ResponseTemplate};
2011
2012    fn provider_pointing_at(url: &str) -> OpenAiProvider {
2013        OpenAiProvider::new(&ProviderConfig {
2014            provider_type: "openai".into(),
2015            api_key: Some("sk-test".into()),
2016            model: "test-model".into(),
2017            base_url: Some(format!("{}/v1", url)),
2018            system_prompt: None,
2019            user_agent: None,
2020            context_window: 8000,
2021            max_tokens: Some(1024),
2022            thinking_type: None,
2023            thinking_keep: None,
2024            reasoning_history: None,
2025            thinking_enabled: None,
2026            thinking_budget: None,
2027            skip_tls_verify: false,
2028            ephemeral: false,
2029        })
2030        .expect("provider construction")
2031    }
2032
2033    async fn collect_stream(p: &OpenAiProvider) -> Vec<StreamEvent> {
2034        let msg = Message {
2035            role: Role::User,
2036            content: MessageContent::Text("hi".into()),
2037        };
2038        let mut stream = p.chat_stream(&[msg], None).expect("stream");
2039        let mut out = Vec::new();
2040        while let Some(ev) = stream.next().await {
2041            match ev {
2042                Ok(e) => out.push(e),
2043                Err(e) => panic!("transport error: {:#}", e),
2044            }
2045        }
2046        out
2047    }
2048
2049    /// Gateway streams ONE chunk with an error blob, no DONE, then
2050    /// closes. Provider must surface that as `Error(blob)`, NOT as
2051    /// `Done { truncated: true }` (which would trigger the agent's
2052    /// resume-retry and render the same blob twice).
2053    #[tokio::test]
2054    async fn abrupt_close_with_single_error_chunk_becomes_stream_error() {
2055        let server = MockServer::start().await;
2056        let sse = "data: {\"choices\":[{\"delta\":{\"content\":\
2057                   \"模型「GLM-5.1」的请求负载过高,请稍后再试。\"}}]}\n\n";
2058        Mock::given(method("POST"))
2059            .and(path("/v1/chat/completions"))
2060            .respond_with(
2061                ResponseTemplate::new(200)
2062                    .insert_header("content-type", "text/event-stream")
2063                    .set_body_string(sse),
2064            )
2065            .mount(&server)
2066            .await;
2067
2068        let p = provider_pointing_at(&server.uri());
2069        let events = collect_stream(&p).await;
2070        let has_error = events
2071            .iter()
2072            .any(|e| matches!(e, StreamEvent::Error(s) if s.contains("请求负载过高")));
2073        let has_truncated_done = events
2074            .iter()
2075            .any(|e| matches!(e, StreamEvent::Done { truncated: true }));
2076        let has_marker_delta = events.iter().any(|e| {
2077            matches!(e, StreamEvent::Delta(s) if s.contains("stream ended without close marker"))
2078        });
2079        assert!(
2080            has_error,
2081            "expected StreamEvent::Error(gateway blob), got: {:?}",
2082            events
2083        );
2084        assert!(
2085            !has_truncated_done,
2086            "abrupt close on tiny error blob must NOT emit Done(truncated=true): {:?}",
2087            events
2088        );
2089        assert!(
2090            !has_marker_delta,
2091            "abrupt close on tiny error blob must NOT emit the [stream ended …] marker delta: {:?}",
2092            events
2093        );
2094    }
2095
2096    /// Real-truncation case: many chunks of substantive content,
2097    /// then abrupt close (no DONE / no finish_reason). Stays on the
2098    /// existing `Done { truncated: true }` path so the agent's
2099    /// "resume where you left off" retry can salvage the partial
2100    /// output (table-cut, list-cut, etc.).
2101    #[tokio::test]
2102    async fn abrupt_close_with_substantive_content_still_emits_truncated_done() {
2103        let server = MockServer::start().await;
2104        // 5 chunks × ~50 chars each = ~250 chars of real content.
2105        // Above the 200-char and 2-chunk thresholds → not a
2106        // gateway error.
2107        let mut sse = String::new();
2108        for i in 0..5 {
2109            sse.push_str(&format!(
2110                "data: {{\"choices\":[{{\"delta\":{{\"content\":\
2111                 \"line {} with enough content to clear the heuristic thresholds. \"}}}}]}}\n\n",
2112                i
2113            ));
2114        }
2115        Mock::given(method("POST"))
2116            .and(path("/v1/chat/completions"))
2117            .respond_with(
2118                ResponseTemplate::new(200)
2119                    .insert_header("content-type", "text/event-stream")
2120                    .set_body_string(sse),
2121            )
2122            .mount(&server)
2123            .await;
2124
2125        let p = provider_pointing_at(&server.uri());
2126        let events = collect_stream(&p).await;
2127        let has_truncated_done = events
2128            .iter()
2129            .any(|e| matches!(e, StreamEvent::Done { truncated: true }));
2130        let has_error = events.iter().any(|e| matches!(e, StreamEvent::Error(_)));
2131        assert!(
2132            has_truncated_done,
2133            "substantive content + abrupt close must keep Done(truncated=true): {:?}",
2134            events
2135        );
2136        assert!(
2137            !has_error,
2138            "real truncation must NOT be misclassified as Error: {:?}",
2139            events
2140        );
2141    }
2142
2143    // 401 from a non-atomgit gateway must keep the verbatim server
2144    // error message — user-supplied `sk-...` API keys are not
2145    // refreshable by us, and developers debugging a bad key need to
2146    // see what the upstream actually said. The new auth-retry path is
2147    // only allowed to engage for AtomGit gateway hosts.
2148    #[tokio::test]
2149    async fn non_atomgit_gateway_401_keeps_verbatim_error() {
2150        let server = MockServer::start().await;
2151        let body = r#"{"error":{"message":"invalid_api_key"}}"#;
2152        Mock::given(method("POST"))
2153            .and(path("/v1/chat/completions"))
2154            .respond_with(
2155                ResponseTemplate::new(401)
2156                    .insert_header("content-type", "application/json")
2157                    .set_body_string(body),
2158            )
2159            .mount(&server)
2160            .await;
2161
2162        let p = provider_pointing_at(&server.uri());
2163        let events = collect_stream(&p).await;
2164        let err_msg = events
2165            .iter()
2166            .find_map(|e| match e {
2167                StreamEvent::Error(s) => Some(s.clone()),
2168                _ => None,
2169            })
2170            .unwrap_or_else(|| panic!("expected StreamEvent::Error, got: {:?}", events));
2171        assert!(
2172            err_msg.contains("invalid_api_key"),
2173            "non-atomgit 401 must include verbatim server message: {}",
2174            err_msg
2175        );
2176        // The i18n-friendly hint is reserved for atomgit-gateway 401s;
2177        // it must NOT leak into the generic OpenAI / sk-... path.
2178        let friendly = crate::i18n::t(crate::i18n::Msg::ChatAuthExpired).to_string();
2179        assert!(
2180            !err_msg.contains(&friendly),
2181            "non-atomgit 401 must NOT receive the AtomGit /login hint: got {}",
2182            err_msg
2183        );
2184    }
2185}
2186
2187#[cfg(test)]
2188mod chat_auth_expired_i18n_tests {
2189    use crate::i18n::{t_with, Locale, Msg};
2190
2191    #[test]
2192    fn message_is_non_empty_in_both_locales() {
2193        // Both must surface SOMETHING — an empty hint defeats the
2194        // entire point of this branch (point user at /login).
2195        let zh = t_with(Locale::ZhCn, Msg::ChatAuthExpired).to_string();
2196        let en = t_with(Locale::En, Msg::ChatAuthExpired).to_string();
2197        assert!(!zh.is_empty(), "zh message must be populated");
2198        assert!(!en.is_empty(), "en message must be populated");
2199    }
2200
2201    #[test]
2202    fn message_mentions_login_in_both_locales() {
2203        // The whole point of this message is to direct the user to
2204        // re-authenticate. If a future translator drops the `/login`
2205        // reference the hint becomes useless.
2206        let zh = t_with(Locale::ZhCn, Msg::ChatAuthExpired).to_string();
2207        let en = t_with(Locale::En, Msg::ChatAuthExpired).to_string();
2208        assert!(zh.contains("/login"), "zh message must mention /login: {}", zh);
2209        assert!(en.contains("/login"), "en message must mention /login: {}", en);
2210    }
2211}
2212
2213#[cfg(test)]
2214mod codingplan_signing_tests {
2215    use super::*;
2216
2217    #[test]
2218    fn build_signed_headers_returns_empty_for_non_atomgit_host() {
2219        let headers = build_codingplan_headers(
2220            "https://api.openai.com/v1",
2221            b"{}",
2222            None,
2223        )
2224        .expect("non-atomgit host must not error");
2225        assert!(headers.is_empty(), "got unexpected headers: {:?}", headers);
2226    }
2227
2228    #[test]
2229    fn build_signed_headers_errors_when_atomgit_host_in_open_source_build() {
2230        // Open-source build: signer() is UnavailableSigner, so an
2231        // atomgit-bound request must error with the localised hint.
2232        let err = build_codingplan_headers(
2233            "https://llm-api.atomgit.com/v1",
2234            b"{}",
2235            Some(("dummy-user-id", "dummy-token")),
2236        )
2237        .expect_err("open-source build must error out");
2238        let msg = format!("{:#}", err);
2239        assert!(
2240            msg.contains("official") || msg.contains("官方"),
2241            "error message should mention the official-build requirement, got: {msg}",
2242        );
2243    }
2244
2245    #[test]
2246    fn build_signed_headers_errors_when_atomgit_host_with_empty_auth() {
2247        let err = build_codingplan_headers(
2248            "https://llm-api.atomgit.com/v1",
2249            b"{}",
2250            Some(("", "")),
2251        )
2252        .expect_err("empty auth must error");
2253        assert!(!format!("{:#}", err).is_empty());
2254    }
2255}