Skip to main content

codex_helper_core/proxy/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex};
3use std::time::Instant;
4
5use anyhow::{Result, anyhow};
6use axum::Json;
7use axum::Router;
8use axum::body::{Body, Bytes, to_bytes};
9use axum::extract::Query;
10use axum::http::{HeaderMap, HeaderName, HeaderValue, Method, Request, Response, StatusCode, Uri};
11use axum::routing::{any, get, post};
12use reqwest::Client;
13use std::sync::OnceLock;
14use tracing::{instrument, warn};
15
16mod classify;
17mod retry;
18mod runtime_config;
19mod stream;
20#[cfg(test)]
21mod tests;
22
23use crate::config::{ProxyConfig, RetryStrategy, ServiceConfigManager};
24use crate::filter::RequestFilter;
25use crate::lb::{LbState, LoadBalancer, SelectedUpstream};
26use crate::logging::{
27    AuthResolutionLog, BodyPreview, HeaderEntry, HttpDebugLog, http_debug_options,
28    http_warn_options, log_request_with_debug, log_retry_trace, make_body_preview,
29    should_include_http_warn, should_log_request_body_preview,
30};
31use crate::model_routing;
32use crate::state::{ActiveRequest, FinishedRequest, ProxyState};
33use crate::usage::extract_usage_from_bytes;
34
35use self::classify::classify_upstream_response;
36use self::retry::{
37    backoff_sleep, retry_info_for_chain, retry_plan, retry_sleep, should_never_retry,
38    should_retry_class, should_retry_status,
39};
40use self::runtime_config::RuntimeConfig;
41use self::stream::{SseSuccessMeta, build_sse_success_response};
42
43fn format_reqwest_error_for_retry_chain(e: &reqwest::Error) -> String {
44    use std::error::Error as _;
45
46    let mut parts: Vec<String> = Vec::new();
47    let first = e.to_string();
48    if !first.trim().is_empty() {
49        parts.push(first);
50    }
51
52    let mut cur = e.source();
53    for _ in 0..4 {
54        let Some(src) = cur else { break };
55        let msg = src.to_string();
56        if !msg.trim().is_empty() && !parts.iter().any(|x| x == &msg) {
57            parts.push(msg);
58        }
59        cur = src.source();
60    }
61
62    let mut flags: Vec<&'static str> = Vec::new();
63    if e.is_timeout() {
64        flags.push("timeout");
65    }
66    if e.is_connect() {
67        flags.push("connect");
68    }
69
70    let mut out = if parts.is_empty() {
71        "reqwest error".to_string()
72    } else {
73        parts.join(" | caused_by: ")
74    };
75    if !flags.is_empty() {
76        out.push_str(" (flags: ");
77        out.push_str(&flags.join(","));
78        out.push(')');
79    }
80    out = out.replace(['\r', '\n'], " ");
81    const MAX_LEN: usize = 360;
82    if out.len() > MAX_LEN {
83        out.truncate(MAX_LEN);
84        out.push('…');
85    }
86    out
87}
88
89#[allow(dead_code)]
90fn lb_state_snapshot_json(lb: &LoadBalancer) -> Option<serde_json::Value> {
91    let map = match lb.states.lock() {
92        Ok(m) => m,
93        Err(e) => e.into_inner(),
94    };
95    let st = map.get(&lb.service.name)?;
96    let now = std::time::Instant::now();
97    let upstreams = (0..lb.service.upstreams.len())
98        .map(|idx| {
99            let cooldown_remaining_ms = st
100                .cooldown_until
101                .get(idx)
102                .and_then(|x| *x)
103                .map(|until| until.saturating_duration_since(now).as_millis() as u64)
104                .filter(|&ms| ms > 0);
105            serde_json::json!({
106                "idx": idx,
107                "failure_count": st.failure_counts.get(idx).copied(),
108                "penalty_streak": st.penalty_streak.get(idx).copied(),
109                "usage_exhausted": st.usage_exhausted.get(idx).copied(),
110                "cooldown_remaining_ms": cooldown_remaining_ms,
111            })
112        })
113        .collect::<Vec<_>>();
114    Some(serde_json::json!({
115        "last_good_index": st.last_good_index,
116        "upstreams": upstreams,
117    }))
118}
119
120fn read_json_file(path: &std::path::Path) -> Option<serde_json::Value> {
121    let bytes = std::fs::read(path).ok()?;
122    let text = String::from_utf8_lossy(&bytes);
123    if text.trim().is_empty() {
124        return None;
125    }
126    serde_json::from_str(&text).ok()
127}
128
129fn codex_auth_json_value(key: &str) -> Option<String> {
130    static CACHE: std::sync::OnceLock<Option<serde_json::Value>> = std::sync::OnceLock::new();
131    let v = CACHE.get_or_init(|| read_json_file(&crate::config::codex_auth_path()));
132    let obj = v.as_ref()?.as_object()?;
133    obj.get(key).and_then(|x| x.as_str()).map(|s| s.to_string())
134}
135
136fn claude_settings_env_value(key: &str) -> Option<String> {
137    static CACHE: std::sync::OnceLock<Option<serde_json::Value>> = std::sync::OnceLock::new();
138    let v = CACHE.get_or_init(|| read_json_file(&crate::config::claude_settings_path()));
139    let obj = v.as_ref()?.as_object()?;
140    let env_obj = obj.get("env")?.as_object()?;
141    env_obj
142        .get(key)
143        .and_then(|x| x.as_str())
144        .map(|s| s.to_string())
145}
146
147fn resolve_auth_token_with_source(
148    service_name: &str,
149    auth: &crate::config::UpstreamAuth,
150    client_has_auth: bool,
151) -> (Option<String>, String) {
152    if let Some(token) = auth.auth_token.as_deref()
153        && !token.trim().is_empty()
154    {
155        return (Some(token.to_string()), "inline".to_string());
156    }
157
158    if let Some(env_name) = auth.auth_token_env.as_deref()
159        && !env_name.trim().is_empty()
160    {
161        if let Ok(v) = std::env::var(env_name)
162            && !v.trim().is_empty()
163        {
164            return (Some(v), format!("env:{env_name}"));
165        }
166
167        let file_value = match service_name {
168            "codex" => codex_auth_json_value(env_name),
169            "claude" => claude_settings_env_value(env_name),
170            _ => None,
171        };
172        if let Some(v) = file_value
173            && !v.trim().is_empty()
174        {
175            let src = match service_name {
176                "codex" => format!("codex_auth_json:{env_name}"),
177                "claude" => format!("claude_settings_env:{env_name}"),
178                _ => format!("file:{env_name}"),
179            };
180            return (Some(v), src);
181        }
182
183        if client_has_auth {
184            return (None, format!("client_passthrough (missing_env:{env_name})"));
185        }
186        return (None, format!("missing_env:{env_name}"));
187    }
188
189    if client_has_auth {
190        (None, "client_passthrough".to_string())
191    } else {
192        (None, "none".to_string())
193    }
194}
195
196fn resolve_api_key_with_source(
197    service_name: &str,
198    auth: &crate::config::UpstreamAuth,
199    client_has_x_api_key: bool,
200) -> (Option<String>, String) {
201    if let Some(key) = auth.api_key.as_deref()
202        && !key.trim().is_empty()
203    {
204        return (Some(key.to_string()), "inline".to_string());
205    }
206
207    if let Some(env_name) = auth.api_key_env.as_deref()
208        && !env_name.trim().is_empty()
209    {
210        if let Ok(v) = std::env::var(env_name)
211            && !v.trim().is_empty()
212        {
213            return (Some(v), format!("env:{env_name}"));
214        }
215
216        let file_value = match service_name {
217            "codex" => codex_auth_json_value(env_name),
218            "claude" => claude_settings_env_value(env_name),
219            _ => None,
220        };
221        if let Some(v) = file_value
222            && !v.trim().is_empty()
223        {
224            let src = match service_name {
225                "codex" => format!("codex_auth_json:{env_name}"),
226                "claude" => format!("claude_settings_env:{env_name}"),
227                _ => format!("file:{env_name}"),
228            };
229            return (Some(v), src);
230        }
231
232        if client_has_x_api_key {
233            return (None, format!("client_passthrough (missing_env:{env_name})"));
234        }
235        return (None, format!("missing_env:{env_name}"));
236    }
237
238    if client_has_x_api_key {
239        (None, "client_passthrough".to_string())
240    } else {
241        (None, "none".to_string())
242    }
243}
244
245fn is_hop_by_hop_header(name_lower: &str) -> bool {
246    matches!(
247        name_lower,
248        "connection"
249            | "keep-alive"
250            | "proxy-authenticate"
251            | "proxy-authorization"
252            | "te"
253            | "trailer"
254            | "trailers"
255            | "transfer-encoding"
256            | "upgrade"
257    )
258}
259
260fn hop_by_hop_connection_tokens(headers: &HeaderMap) -> Vec<String> {
261    let mut out = Vec::new();
262    for value in headers.get_all("connection").iter() {
263        let Ok(s) = value.to_str() else {
264            continue;
265        };
266        for token in s.split(',').map(|t| t.trim()).filter(|t| !t.is_empty()) {
267            out.push(token.to_ascii_lowercase());
268        }
269    }
270    out
271}
272
273fn filter_request_headers(src: &HeaderMap) -> HeaderMap {
274    let extra = hop_by_hop_connection_tokens(src);
275    let mut out = HeaderMap::new();
276    for (name, value) in src.iter() {
277        let name_lower = name.as_str().to_ascii_lowercase();
278        if name_lower == "host"
279            || name_lower == "content-length"
280            || is_hop_by_hop_header(&name_lower)
281        {
282            continue;
283        }
284        if extra.iter().any(|t| t == &name_lower) {
285            continue;
286        }
287        out.append(name.clone(), value.clone());
288    }
289    out
290}
291
292fn filter_response_headers(src: &HeaderMap) -> HeaderMap {
293    let extra = hop_by_hop_connection_tokens(src);
294    let mut out = HeaderMap::new();
295    for (name, value) in src.iter() {
296        let name_lower = name.as_str().to_ascii_lowercase();
297        // reqwest 可能会自动解压响应体;为避免 content-length/content-encoding 与实际 body 不一致,这里不透传它们。
298        if is_hop_by_hop_header(&name_lower)
299            || name_lower == "content-length"
300            || name_lower == "content-encoding"
301        {
302            continue;
303        }
304        if extra.iter().any(|t| t == &name_lower) {
305            continue;
306        }
307        out.append(name.clone(), value.clone());
308    }
309    out
310}
311
312fn header_map_to_entries(headers: &HeaderMap) -> Vec<HeaderEntry> {
313    fn is_sensitive(name_lower: &str) -> bool {
314        matches!(
315            name_lower,
316            "authorization"
317                | "proxy-authorization"
318                | "cookie"
319                | "set-cookie"
320                | "x-api-key"
321                | "x-forwarded-api-key"
322                | "x-goog-api-key"
323        )
324    }
325
326    let mut out = Vec::new();
327    for (name, value) in headers.iter() {
328        let name_lower = name.as_str().to_ascii_lowercase();
329        let v = if is_sensitive(name_lower.as_str()) {
330            "[REDACTED]".to_string()
331        } else {
332            String::from_utf8_lossy(value.as_bytes()).into_owned()
333        };
334        out.push(HeaderEntry {
335            name: name.as_str().to_string(),
336            value: v,
337        });
338    }
339    out
340}
341
342#[derive(Clone)]
343struct HttpDebugBase {
344    debug_max_body_bytes: usize,
345    warn_max_body_bytes: usize,
346    #[allow(dead_code)]
347    request_body_len: usize,
348    #[allow(dead_code)]
349    upstream_request_body_len: usize,
350    client_uri: String,
351    target_url: String,
352    client_headers: Vec<HeaderEntry>,
353    upstream_request_headers: Vec<HeaderEntry>,
354    auth_resolution: Option<AuthResolutionLog>,
355    client_body_debug: Option<BodyPreview>,
356    upstream_request_body_debug: Option<BodyPreview>,
357    client_body_warn: Option<BodyPreview>,
358    upstream_request_body_warn: Option<BodyPreview>,
359}
360
361fn warn_http_debug(status_code: u16, http_debug: &HttpDebugLog) {
362    let max_chars = 2048usize;
363    let Ok(mut json) = serde_json::to_string(http_debug) else {
364        return;
365    };
366    if json.chars().count() > max_chars {
367        json = json.chars().take(max_chars).collect::<String>() + "...[TRUNCATED_FOR_LOG]";
368    }
369    warn!("upstream non-2xx http_debug={json} status_code={status_code}");
370}
371
372/// Generic proxy service; currently used by both Codex and Claude.
373#[derive(Clone)]
374pub struct ProxyService {
375    pub client: Client,
376    config: Arc<RuntimeConfig>,
377    pub service_name: &'static str,
378    lb_states: Arc<Mutex<HashMap<String, LbState>>>,
379    filter: RequestFilter,
380    state: Arc<ProxyState>,
381}
382
383impl ProxyService {
384    pub fn new(
385        client: Client,
386        config: Arc<ProxyConfig>,
387        service_name: &'static str,
388        lb_states: Arc<Mutex<HashMap<String, LbState>>>,
389    ) -> Self {
390        let state = ProxyState::new_with_lb_states(Some(lb_states.clone()));
391        ProxyState::spawn_cleanup_task(state.clone());
392        if !cfg!(test) {
393            let state = state.clone();
394            let log_path = crate::config::proxy_home_dir()
395                .join("logs")
396                .join("requests.jsonl");
397            let mut base_url_to_provider_id = HashMap::new();
398            let mgr = match service_name {
399                "claude" => &config.claude,
400                _ => &config.codex,
401            };
402            for svc in mgr.configs.values() {
403                for up in &svc.upstreams {
404                    if let Some(pid) = up.tags.get("provider_id") {
405                        base_url_to_provider_id.insert(up.base_url.clone(), pid.clone());
406                    }
407                }
408            }
409            tokio::spawn(async move {
410                let _ = state
411                    .replay_usage_from_requests_log(service_name, log_path, base_url_to_provider_id)
412                    .await;
413            });
414        }
415        Self {
416            client,
417            config: Arc::new(RuntimeConfig::new(config)),
418            service_name,
419            lb_states,
420            filter: RequestFilter::new(),
421            state,
422        }
423    }
424
425    fn service_manager<'a>(&self, cfg: &'a ProxyConfig) -> &'a ServiceConfigManager {
426        match self.service_name {
427            "codex" => &cfg.codex,
428            "claude" => &cfg.claude,
429            _ => &cfg.codex,
430        }
431    }
432
433    async fn pinned_config(&self, session_id: Option<&str>) -> Option<(String, &'static str)> {
434        if let Some(sid) = session_id
435            && let Some(name) = self.state.get_session_config_override(sid).await
436            && !name.trim().is_empty()
437        {
438            return Some((name, "session"));
439        }
440        if let Some(name) = self.state.get_global_config_override().await
441            && !name.trim().is_empty()
442        {
443            return Some((name, "global"));
444        }
445        None
446    }
447
448    async fn lbs_for_request(
449        &self,
450        cfg: &ProxyConfig,
451        session_id: Option<&str>,
452    ) -> Vec<LoadBalancer> {
453        let mgr = self.service_manager(cfg);
454        let meta_overrides = self
455            .state
456            .get_config_meta_overrides(self.service_name)
457            .await;
458        if let Some((name, source)) = self.pinned_config(session_id).await {
459            if let Some(svc) = mgr
460                .configs
461                .get(&name)
462                .or_else(|| mgr.active_config())
463                .cloned()
464            {
465                log_retry_trace(serde_json::json!({
466                    "event": "lbs_for_request",
467                    "service": self.service_name,
468                    "session_id": session_id,
469                    "mode": "pinned",
470                    "pinned_source": source,
471                    "pinned_name": name,
472                    "selected_config": svc.name,
473                    "selected_level": svc.level.clamp(1, 10),
474                    "selected_upstreams": svc.upstreams.len(),
475                    "active_config": mgr.active.as_deref(),
476                    "config_count": mgr.configs.len(),
477                }));
478                return vec![LoadBalancer::new(Arc::new(svc), self.lb_states.clone())];
479            }
480            log_retry_trace(serde_json::json!({
481                "event": "lbs_for_request",
482                "service": self.service_name,
483                "session_id": session_id,
484                "mode": "pinned",
485                "pinned_source": source,
486                "pinned_name": name,
487                "selected_config": null,
488                "active_config": mgr.active.as_deref(),
489                "config_count": mgr.configs.len(),
490                "note": "pinned_config_not_found",
491            }));
492            return Vec::new();
493        }
494
495        let active_name = mgr.active.as_deref();
496        let mut configs = mgr
497            .configs
498            .iter()
499            .filter(|(name, svc)| {
500                let (enabled_ovr, _) = meta_overrides
501                    .get(name.as_str())
502                    .copied()
503                    .unwrap_or((None, None));
504                let enabled = enabled_ovr.unwrap_or(svc.enabled);
505                !svc.upstreams.is_empty()
506                    && (enabled || active_name.is_some_and(|n| n == name.as_str()))
507            })
508            .collect::<Vec<_>>();
509
510        let has_multi_level = {
511            let mut levels = configs
512                .iter()
513                .map(|(name, svc)| {
514                    let (_, level_ovr) = meta_overrides
515                        .get(name.as_str())
516                        .copied()
517                        .unwrap_or((None, None));
518                    level_ovr.unwrap_or(svc.level).clamp(1, 10)
519                })
520                .collect::<Vec<_>>();
521            levels.sort_unstable();
522            levels.dedup();
523            levels.len() > 1
524        };
525
526        if !has_multi_level {
527            let eligible_details = || {
528                configs
529                    .iter()
530                    .map(|(name, svc)| {
531                        let (_, level_ovr) = meta_overrides
532                            .get(name.as_str())
533                            .copied()
534                            .unwrap_or((None, None));
535                        serde_json::json!({
536                            "name": (*name).clone(),
537                            "level": level_ovr.unwrap_or(svc.level).clamp(1, 10),
538                            "enabled": svc.enabled,
539                            "upstreams": svc.upstreams.len(),
540                        })
541                    })
542                    .collect::<Vec<_>>()
543            };
544
545            let mut ordered = configs
546                .iter()
547                .map(|(name, svc)| ((*name).clone(), (*svc).clone()))
548                .collect::<Vec<_>>();
549            ordered.sort_by(|(a, _), (b, _)| a.cmp(b));
550            if let Some(active) = active_name
551                && let Some(pos) = ordered.iter().position(|(n, _)| n == active)
552            {
553                let item = ordered.remove(pos);
554                ordered.insert(0, item);
555            }
556
557            let lbs = ordered
558                .into_iter()
559                .map(|(_, svc)| LoadBalancer::new(Arc::new(svc), self.lb_states.clone()))
560                .collect::<Vec<_>>();
561            if !lbs.is_empty() {
562                log_retry_trace(serde_json::json!({
563                    "event": "lbs_for_request",
564                    "service": self.service_name,
565                    "session_id": session_id,
566                    "mode": "single_level_multi",
567                    "active_config": active_name,
568                    "selected_configs": lbs.iter().map(|lb| lb.service.name.clone()).collect::<Vec<_>>(),
569                    "eligible_configs": configs.iter().map(|(n, _)| (*n).clone()).collect::<Vec<_>>(),
570                    "eligible_details": eligible_details(),
571                    "eligible_count": configs.len(),
572                }));
573                return lbs;
574            }
575
576            if let Some(svc) = mgr.active_config().cloned() {
577                log_retry_trace(serde_json::json!({
578                    "event": "lbs_for_request",
579                    "service": self.service_name,
580                    "session_id": session_id,
581                    "mode": "single_level_fallback_active_config",
582                    "active_config": active_name,
583                    "selected_config": svc.name,
584                    "selected_level": svc.level.clamp(1, 10),
585                    "selected_upstreams": svc.upstreams.len(),
586                    "eligible_configs": configs.iter().map(|(n, _)| (*n).clone()).collect::<Vec<_>>(),
587                    "eligible_details": eligible_details(),
588                    "eligible_count": configs.len(),
589                }));
590                return vec![LoadBalancer::new(Arc::new(svc), self.lb_states.clone())];
591            }
592
593            log_retry_trace(serde_json::json!({
594                "event": "lbs_for_request",
595                "service": self.service_name,
596                "session_id": session_id,
597                "mode": "single_level_empty",
598                "active_config": active_name,
599                "eligible_configs": configs.iter().map(|(n, _)| (*n).clone()).collect::<Vec<_>>(),
600                "eligible_details": eligible_details(),
601                "eligible_count": configs.len(),
602            }));
603            return Vec::new();
604        }
605
606        configs.sort_by(|(a_name, a), (b_name, b)| {
607            let a_level = meta_overrides
608                .get(a_name.as_str())
609                .and_then(|(_, l)| *l)
610                .unwrap_or(a.level)
611                .clamp(1, 10);
612            let b_level = meta_overrides
613                .get(b_name.as_str())
614                .and_then(|(_, l)| *l)
615                .unwrap_or(b.level)
616                .clamp(1, 10);
617            let a_active = active_name.is_some_and(|n| n == a_name.as_str());
618            let b_active = active_name.is_some_and(|n| n == b_name.as_str());
619            a_level
620                .cmp(&b_level)
621                .then_with(|| b_active.cmp(&a_active))
622                .then_with(|| a_name.cmp(b_name))
623        });
624
625        let lbs = configs
626            .into_iter()
627            .map(|(_, svc)| LoadBalancer::new(Arc::new(svc.clone()), self.lb_states.clone()))
628            .collect::<Vec<_>>();
629        if !lbs.is_empty() {
630            log_retry_trace(serde_json::json!({
631                "event": "lbs_for_request",
632                "service": self.service_name,
633                "session_id": session_id,
634                "mode": "multi_level",
635                "active_config": active_name,
636                "eligible_configs": lbs.iter().map(|lb| serde_json::json!({
637                    "name": lb.service.name,
638                    "level": lb.service.level.clamp(1, 10),
639                    "upstreams": lb.service.upstreams.len(),
640                })).collect::<Vec<_>>(),
641                "eligible_count": lbs.len(),
642            }));
643            return lbs;
644        }
645
646        if let Some(svc) = mgr.active_config().cloned() {
647            log_retry_trace(serde_json::json!({
648                "event": "lbs_for_request",
649                "service": self.service_name,
650                "session_id": session_id,
651                "mode": "multi_level_fallback_active_config",
652                "active_config": active_name,
653                "selected_config": svc.name,
654                "selected_level": svc.level.clamp(1, 10),
655                "selected_upstreams": svc.upstreams.len(),
656            }));
657            return vec![LoadBalancer::new(Arc::new(svc), self.lb_states.clone())];
658        }
659        log_retry_trace(serde_json::json!({
660            "event": "lbs_for_request",
661            "service": self.service_name,
662            "session_id": session_id,
663            "mode": "multi_level_empty",
664            "active_config": active_name,
665        }));
666        Vec::new()
667    }
668
669    fn build_target(
670        &self,
671        upstream: &SelectedUpstream,
672        uri: &Uri,
673    ) -> Result<(reqwest::Url, HeaderMap)> {
674        let base = upstream.upstream.base_url.trim_end_matches('/').to_string();
675
676        let base_url = reqwest::Url::parse(&base)
677            .map_err(|e| anyhow!("invalid upstream base_url {base}: {e}"))?;
678        let base_path = base_url.path().trim_end_matches('/').to_string();
679
680        let mut path = uri.path().to_string();
681        if !base_path.is_empty()
682            && base_path != "/"
683            && (path == base_path || path.starts_with(&format!("{base_path}/")))
684        {
685            // If the incoming request path already contains the base_url path prefix,
686            // strip it to avoid double-prefixing (e.g. base_url=/v1 and request=/v1/responses).
687            let rest = &path[base_path.len()..];
688            path = if rest.is_empty() {
689                "/".to_string()
690            } else {
691                rest.to_string()
692            };
693            if !path.starts_with('/') {
694                path = format!("/{path}");
695            }
696        }
697        let path_and_query = if let Some(q) = uri.query() {
698            format!("{path}?{q}")
699        } else {
700            path
701        };
702
703        let full = format!("{base}{path_and_query}");
704        let url =
705            reqwest::Url::parse(&full).map_err(|e| anyhow!("invalid upstream url {full}: {e}"))?;
706
707        // ensure query preserved (Url::parse already includes it)
708        let headers = HeaderMap::new();
709        Ok((url, headers))
710    }
711
712    pub fn state_handle(&self) -> Arc<ProxyState> {
713        self.state.clone()
714    }
715}
716
717fn header_str<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> {
718    headers.get(name).and_then(|v| v.to_str().ok())
719}
720
721fn extract_session_id(headers: &HeaderMap) -> Option<String> {
722    header_str(headers, "session_id")
723        .or_else(|| header_str(headers, "conversation_id"))
724        .map(|s| s.to_string())
725}
726
727fn extract_reasoning_effort_from_request_body(body: &[u8]) -> Option<String> {
728    let v: serde_json::Value = serde_json::from_slice(body).ok()?;
729    v.get("reasoning")
730        .and_then(|r| r.get("effort"))
731        .and_then(|e| e.as_str())
732        .map(|s| s.to_string())
733}
734
735fn extract_model_from_request_body(body: &[u8]) -> Option<String> {
736    let v: serde_json::Value = serde_json::from_slice(body).ok()?;
737    v.get("model")
738        .and_then(|m| m.as_str())
739        .map(|s| s.to_string())
740}
741
742fn apply_reasoning_effort_override(body: &[u8], effort: &str) -> Option<Vec<u8>> {
743    let mut v: serde_json::Value = serde_json::from_slice(body).ok()?;
744    let reasoning = v.get_mut("reasoning").and_then(|r| r.as_object_mut());
745    if let Some(obj) = reasoning {
746        obj.insert(
747            "effort".to_string(),
748            serde_json::Value::String(effort.to_string()),
749        );
750    } else {
751        let mut obj = serde_json::Map::new();
752        obj.insert(
753            "effort".to_string(),
754            serde_json::Value::String(effort.to_string()),
755        );
756        v.as_object_mut()?
757            .insert("reasoning".to_string(), serde_json::Value::Object(obj));
758    }
759    serde_json::to_vec(&v).ok()
760}
761
762fn apply_model_override(body: &[u8], model: &str) -> Option<Vec<u8>> {
763    let mut v: serde_json::Value = serde_json::from_slice(body).ok()?;
764    v.as_object_mut()?.insert(
765        "model".to_string(),
766        serde_json::Value::String(model.to_string()),
767    );
768    serde_json::to_vec(&v).ok()
769}
770
771#[instrument(skip_all, fields(service = %proxy.service_name))]
772pub async fn handle_proxy(
773    proxy: ProxyService,
774    req: Request<Body>,
775) -> Result<Response<Body>, (StatusCode, String)> {
776    let start = Instant::now();
777    let started_at_ms = std::time::SystemTime::now()
778        .duration_since(std::time::UNIX_EPOCH)
779        .map(|d| d.as_millis() as u64)
780        .unwrap_or(0);
781
782    let (parts, body) = req.into_parts();
783    let uri = parts.uri;
784    let method = parts.method;
785    let client_headers = parts.headers;
786    let client_headers_entries_cache: OnceLock<Vec<HeaderEntry>> = OnceLock::new();
787
788    let session_id = extract_session_id(&client_headers);
789
790    proxy.config.maybe_reload_from_disk().await;
791    let cfg_snapshot = proxy.config.snapshot().await;
792    let lbs = proxy
793        .lbs_for_request(cfg_snapshot.as_ref(), session_id.as_deref())
794        .await;
795    if lbs.is_empty() {
796        let dur = start.elapsed().as_millis() as u64;
797        let status = StatusCode::BAD_GATEWAY;
798        let client_headers_entries = client_headers_entries_cache
799            .get_or_init(|| header_map_to_entries(&client_headers))
800            .clone();
801        let http_debug = if should_include_http_warn(status.as_u16()) {
802            Some(HttpDebugLog {
803                request_body_len: None,
804                upstream_request_body_len: None,
805                upstream_headers_ms: None,
806                upstream_first_chunk_ms: None,
807                upstream_body_read_ms: None,
808                upstream_error_class: Some("no_active_upstream_config".to_string()),
809                upstream_error_hint: Some(
810                    "未找到任何可用的上游配置(active_config 为空或 upstreams 为空)。".to_string(),
811                ),
812                upstream_cf_ray: None,
813                client_uri: uri.to_string(),
814                target_url: "-".to_string(),
815                client_headers: client_headers_entries,
816                upstream_request_headers: Vec::new(),
817                auth_resolution: None,
818                client_body: None,
819                upstream_request_body: None,
820                upstream_response_headers: None,
821                upstream_response_body: None,
822                upstream_error: Some("no active upstream config".to_string()),
823            })
824        } else {
825            None
826        };
827        log_request_with_debug(
828            proxy.service_name,
829            method.as_str(),
830            uri.path(),
831            status.as_u16(),
832            dur,
833            None,
834            "-",
835            None,
836            "-",
837            session_id.clone(),
838            None,
839            None,
840            None,
841            None,
842            http_debug,
843        );
844        return Err((status, "no active upstream config".to_string()));
845    }
846    let client_content_type = client_headers
847        .get("content-type")
848        .and_then(|v| v.to_str().ok());
849
850    // Detect streaming (SSE).
851    let is_stream = client_headers
852        .get("accept")
853        .and_then(|v| v.to_str().ok())
854        .map(|s| s.contains("text/event-stream"))
855        .unwrap_or(false);
856
857    let path = uri.path();
858    let is_responses_path = path.ends_with("/responses");
859    let is_user_turn = method == Method::POST && is_responses_path;
860    let is_codex_service = proxy.service_name == "codex";
861
862    let cwd = if let Some(id) = session_id.as_deref() {
863        proxy.state.resolve_session_cwd(id).await
864    } else {
865        None
866    };
867    if let Some(id) = session_id.as_deref() {
868        proxy.state.touch_session_override(id, started_at_ms).await;
869        proxy
870            .state
871            .touch_session_config_override(id, started_at_ms)
872            .await;
873    }
874
875    // Read request body and apply filters.
876    let raw_body = match to_bytes(body, 10 * 1024 * 1024).await {
877        Ok(b) => b,
878        Err(e) => {
879            let dur = start.elapsed().as_millis() as u64;
880            let status = StatusCode::BAD_REQUEST;
881            let err_str = e.to_string();
882            let client_headers_entries = client_headers_entries_cache
883                .get_or_init(|| header_map_to_entries(&client_headers))
884                .clone();
885            let http_debug = if should_include_http_warn(status.as_u16()) {
886                Some(HttpDebugLog {
887                    request_body_len: None,
888                    upstream_request_body_len: None,
889                    upstream_headers_ms: None,
890                    upstream_first_chunk_ms: None,
891                    upstream_body_read_ms: None,
892                    upstream_error_class: Some("client_body_read_error".to_string()),
893                    upstream_error_hint: Some(
894                        "读取客户端请求 body 失败(可能超过大小限制或连接中断)。".to_string(),
895                    ),
896                    upstream_cf_ray: None,
897                    client_uri: uri.to_string(),
898                    target_url: "-".to_string(),
899                    client_headers: client_headers_entries,
900                    upstream_request_headers: Vec::new(),
901                    auth_resolution: None,
902                    client_body: None,
903                    upstream_request_body: None,
904                    upstream_response_headers: None,
905                    upstream_response_body: None,
906                    upstream_error: Some(err_str.clone()),
907                })
908            } else {
909                None
910            };
911            log_request_with_debug(
912                proxy.service_name,
913                method.as_str(),
914                uri.path(),
915                status.as_u16(),
916                dur,
917                None,
918                "-",
919                None,
920                "-",
921                session_id.clone(),
922                cwd.clone(),
923                None,
924                None,
925                None,
926                http_debug,
927            );
928            return Err((status, err_str));
929        }
930    };
931    let original_effort = extract_reasoning_effort_from_request_body(&raw_body);
932    let override_effort = if let Some(id) = session_id.as_deref() {
933        proxy.state.get_session_effort_override(id).await
934    } else {
935        None
936    };
937    let effective_effort = override_effort.clone().or(original_effort.clone());
938
939    let body_for_upstream = if let Some(ref effort) = override_effort {
940        Bytes::from(
941            apply_reasoning_effort_override(&raw_body, effort)
942                .unwrap_or_else(|| raw_body.as_ref().to_vec()),
943        )
944    } else {
945        raw_body.clone()
946    };
947    let request_model = extract_model_from_request_body(body_for_upstream.as_ref());
948    let request_body_len = raw_body.len();
949
950    let debug_opt = http_debug_options();
951    let warn_opt = http_warn_options();
952    let debug_max = if debug_opt.enabled {
953        debug_opt.max_body_bytes
954    } else {
955        0
956    };
957    let warn_max = if warn_opt.enabled {
958        warn_opt.max_body_bytes
959    } else {
960        0
961    };
962    let request_body_previews = should_log_request_body_preview();
963    let client_body_debug = if request_body_previews && debug_max > 0 {
964        Some(make_body_preview(&raw_body, client_content_type, debug_max))
965    } else {
966        None
967    };
968    let client_body_warn = if request_body_previews && warn_max > 0 {
969        Some(make_body_preview(&raw_body, client_content_type, warn_max))
970    } else {
971        None
972    };
973
974    let request_id = proxy
975        .state
976        .begin_request(
977            proxy.service_name,
978            method.as_str(),
979            uri.path(),
980            session_id.clone(),
981            cwd.clone(),
982            request_model.clone(),
983            effective_effort.clone(),
984            started_at_ms,
985        )
986        .await;
987
988    let retry_cfg = cfg_snapshot.retry.resolve();
989    let plan = retry_plan(&retry_cfg);
990    let upstream_opt = &plan.upstream;
991    let provider_opt = &plan.provider;
992    let cooldown_backoff = crate::lb::CooldownBackoff {
993        factor: plan.cooldown_backoff_factor,
994        max_secs: plan.cooldown_backoff_max_secs,
995    };
996    log_retry_trace(serde_json::json!({
997        "event": "retry_options",
998        "service": proxy.service_name,
999        "request_id": request_id,
1000        "upstream": {
1001            "max_attempts": upstream_opt.max_attempts,
1002            "base_backoff_ms": upstream_opt.base_backoff_ms,
1003            "max_backoff_ms": upstream_opt.max_backoff_ms,
1004            "jitter_ms": upstream_opt.jitter_ms,
1005            "retry_status_ranges": upstream_opt.retry_status_ranges,
1006            "retry_error_classes": upstream_opt.retry_error_classes,
1007            "strategy": if upstream_opt.strategy == RetryStrategy::Failover { "failover" } else { "same_upstream" },
1008        },
1009        "provider": {
1010            "max_attempts": provider_opt.max_attempts,
1011            "base_backoff_ms": provider_opt.base_backoff_ms,
1012            "max_backoff_ms": provider_opt.max_backoff_ms,
1013            "jitter_ms": provider_opt.jitter_ms,
1014            "retry_status_ranges": provider_opt.retry_status_ranges,
1015            "retry_error_classes": provider_opt.retry_error_classes,
1016            "strategy": if provider_opt.strategy == RetryStrategy::Failover { "failover" } else { "same_upstream" },
1017        },
1018        "never_status_ranges": plan.never_status_ranges,
1019        "never_error_classes": plan.never_error_classes,
1020        "cloudflare_challenge_cooldown_secs": plan.cloudflare_challenge_cooldown_secs,
1021        "cloudflare_timeout_cooldown_secs": plan.cloudflare_timeout_cooldown_secs,
1022        "transport_cooldown_secs": plan.transport_cooldown_secs,
1023        "cooldown_backoff_factor": plan.cooldown_backoff_factor,
1024        "cooldown_backoff_max_secs": plan.cooldown_backoff_max_secs,
1025    }));
1026    let total_upstreams = lbs
1027        .iter()
1028        .map(|lb| lb.service.upstreams.len())
1029        .sum::<usize>();
1030    let mut avoid: HashMap<String, HashSet<usize>> = HashMap::new();
1031    let mut upstream_chain: Vec<String> = Vec::new();
1032    let mut avoided_total: usize = 0;
1033
1034    // --- Two-layer retry model ---
1035    //
1036    // Layer 1 (upstream): retry within the current provider/config (default: same_upstream).
1037    // Layer 2 (provider): after upstream retries are exhausted (or failure is not upstream-retryable),
1038    // fail over to another provider/config when eligible.
1039    //
1040    // Guardrails: never_on_status / never_on_class prevent amplifying obvious client-side mistakes.
1041    let mut tried_configs: HashSet<String> = HashSet::new();
1042    let strict_multi_config = lbs.len() > 1;
1043    let mut global_attempt: u32 = 0;
1044    let mut last_err: Option<(StatusCode, String)> = None;
1045
1046    for provider_attempt in 0..provider_opt.max_attempts {
1047        // Pick the next provider/config in the precomputed order, skipping ones we've already tried.
1048        let mut provider_lb: Option<LoadBalancer> = None;
1049        for lb in &lbs {
1050            if tried_configs.contains(&lb.service.name) {
1051                continue;
1052            }
1053            provider_lb = Some(lb.clone());
1054            break;
1055        }
1056        let Some(lb) = provider_lb else {
1057            break;
1058        };
1059        let config_name = lb.service.name.clone();
1060
1061        // Try all upstreams under this provider/config (respecting in-request avoid set).
1062        'upstreams: loop {
1063            let avoid_set = avoid.entry(config_name.clone()).or_default();
1064            let upstream_total = lb.service.upstreams.len();
1065            if upstream_total > 0 && avoid_set.len() >= upstream_total {
1066                break 'upstreams;
1067            }
1068
1069            // Select an eligible upstream inside this provider/config (skip unsupported models).
1070            let selected = loop {
1071                let upstream_total = lb.service.upstreams.len();
1072                if upstream_total > 0 && avoid_set.len() >= upstream_total {
1073                    break None;
1074                }
1075                let next = {
1076                    let avoid_ref: &HashSet<usize> = &*avoid_set;
1077                    if strict_multi_config {
1078                        lb.select_upstream_avoiding_strict(avoid_ref)
1079                    } else {
1080                        lb.select_upstream_avoiding(avoid_ref)
1081                    }
1082                };
1083                let Some(selected) = next else {
1084                    break None;
1085                };
1086
1087                if let Some(ref requested_model) = request_model {
1088                    let supported = model_routing::is_model_supported(
1089                        &selected.upstream.supported_models,
1090                        &selected.upstream.model_mapping,
1091                        requested_model,
1092                    );
1093                    if !supported {
1094                        upstream_chain.push(format!(
1095                            "{}:{} (idx={}) skipped_unsupported_model={}",
1096                            selected.config_name,
1097                            selected.upstream.base_url,
1098                            selected.index,
1099                            requested_model
1100                        ));
1101                        if avoid_set.insert(selected.index) {
1102                            avoided_total = avoided_total.saturating_add(1);
1103                        }
1104                        continue;
1105                    }
1106                }
1107
1108                break Some(selected);
1109            };
1110
1111            let Some(selected) = selected else {
1112                break 'upstreams;
1113            };
1114
1115            let mut model_note = "-".to_string();
1116            let mut body_for_selected = body_for_upstream.clone();
1117            if let Some(ref requested_model) = request_model {
1118                let effective_model = model_routing::effective_model(
1119                    &selected.upstream.model_mapping,
1120                    requested_model,
1121                );
1122                if effective_model != *requested_model {
1123                    if let Some(modified) =
1124                        apply_model_override(body_for_upstream.as_ref(), effective_model.as_str())
1125                    {
1126                        body_for_selected = Bytes::from(modified);
1127                    }
1128                    model_note = format!("{requested_model}->{effective_model}");
1129                } else {
1130                    model_note = requested_model.clone();
1131                }
1132            }
1133
1134            let filtered_body = proxy.filter.apply_bytes(body_for_selected);
1135            let upstream_request_body_len = filtered_body.len();
1136            let upstream_request_body_debug = if request_body_previews && debug_max > 0 {
1137                Some(make_body_preview(
1138                    &filtered_body,
1139                    client_content_type,
1140                    debug_max,
1141                ))
1142            } else {
1143                None
1144            };
1145            let upstream_request_body_warn = if request_body_previews && warn_max > 0 {
1146                Some(make_body_preview(
1147                    &filtered_body,
1148                    client_content_type,
1149                    warn_max,
1150                ))
1151            } else {
1152                None
1153            };
1154
1155            // Layer 1: retry the same upstream a small number of times.
1156            for upstream_attempt in 0..upstream_opt.max_attempts {
1157                global_attempt = global_attempt.saturating_add(1);
1158                let provider_id = selected.upstream.tags.get("provider_id").cloned();
1159                let mut avoid_for_config = avoid_set.iter().copied().collect::<Vec<_>>();
1160                avoid_for_config.sort_unstable();
1161
1162                log_retry_trace(serde_json::json!({
1163                    "event": "attempt_select",
1164                    "service": proxy.service_name,
1165                    "request_id": request_id,
1166                    "attempt": global_attempt,
1167                    "provider_attempt": provider_attempt + 1,
1168                    "upstream_attempt": upstream_attempt + 1,
1169                    "provider_max_attempts": provider_opt.max_attempts,
1170                    "upstream_max_attempts": upstream_opt.max_attempts,
1171                    "config_name": selected.config_name.as_str(),
1172                    "upstream_index": selected.index,
1173                    "upstream_base_url": selected.upstream.base_url.as_str(),
1174                    "provider_id": provider_id.as_deref(),
1175                    "avoid_for_config": avoid_for_config,
1176                    "avoided_total": avoided_total,
1177                    "total_upstreams": total_upstreams,
1178                    "model": model_note.as_str(),
1179                }));
1180
1181                let target_url = match proxy.build_target(&selected, &uri) {
1182                    Ok((url, _headers)) => url,
1183                    Err(e) => {
1184                        let err_str = e.to_string();
1185                        upstream_chain.push(format!(
1186                            "{}:{} (idx={}) target_build_error={} model={}",
1187                            selected.config_name,
1188                            selected.upstream.base_url,
1189                            selected.index,
1190                            err_str,
1191                            model_note.as_str()
1192                        ));
1193                        if avoid_set.insert(selected.index) {
1194                            avoided_total = avoided_total.saturating_add(1);
1195                        }
1196                        last_err = Some((StatusCode::BAD_GATEWAY, err_str));
1197                        break;
1198                    }
1199                };
1200
1201                // copy headers, stripping host/content-length and hop-by-hop.
1202                // auth headers:
1203                // - if upstream config provides a token/key, override client values;
1204                // - otherwise, preserve client Authorization / X-API-Key (required for requires_openai_auth=true providers).
1205                let mut headers = filter_request_headers(&client_headers);
1206                let client_has_auth = headers.contains_key("authorization");
1207                let (token, _token_src) = resolve_auth_token_with_source(
1208                    proxy.service_name,
1209                    &selected.upstream.auth,
1210                    client_has_auth,
1211                );
1212                if let Some(token) = token
1213                    && let Ok(v) = HeaderValue::from_str(&format!("Bearer {token}"))
1214                {
1215                    headers.insert(HeaderName::from_static("authorization"), v);
1216                }
1217
1218                let client_has_x_api_key = headers.contains_key("x-api-key");
1219                let (api_key, _api_key_src) = resolve_api_key_with_source(
1220                    proxy.service_name,
1221                    &selected.upstream.auth,
1222                    client_has_x_api_key,
1223                );
1224                if let Some(key) = api_key
1225                    && let Ok(v) = HeaderValue::from_str(&key)
1226                {
1227                    headers.insert(HeaderName::from_static("x-api-key"), v);
1228                }
1229
1230                let upstream_request_headers = headers.clone();
1231                proxy
1232                    .state
1233                    .update_request_route(
1234                        request_id,
1235                        selected.config_name.clone(),
1236                        provider_id.clone(),
1237                        selected.upstream.base_url.clone(),
1238                    )
1239                    .await;
1240
1241                let debug_base = if debug_max > 0 || warn_max > 0 {
1242                    Some(HttpDebugBase {
1243                        debug_max_body_bytes: debug_max,
1244                        warn_max_body_bytes: warn_max,
1245                        request_body_len,
1246                        upstream_request_body_len,
1247                        client_uri: uri.to_string(),
1248                        target_url: target_url.to_string(),
1249                        client_headers: client_headers_entries_cache
1250                            .get_or_init(|| header_map_to_entries(&client_headers))
1251                            .clone(),
1252                        upstream_request_headers: header_map_to_entries(&upstream_request_headers),
1253                        auth_resolution: None,
1254                        client_body_debug: client_body_debug.clone(),
1255                        upstream_request_body_debug: upstream_request_body_debug.clone(),
1256                        client_body_warn: client_body_warn.clone(),
1257                        upstream_request_body_warn: upstream_request_body_warn.clone(),
1258                    })
1259                } else {
1260                    None
1261                };
1262
1263                let builder = proxy
1264                    .client
1265                    .request(method.clone(), target_url.clone())
1266                    .headers(headers)
1267                    .body(filtered_body.clone());
1268
1269                let upstream_start = Instant::now();
1270                let resp = match builder.send().await {
1271                    Ok(r) => r,
1272                    Err(e) => {
1273                        let err_str = format_reqwest_error_for_retry_chain(&e);
1274                        upstream_chain.push(format!(
1275                            "{}:{} (idx={}) transport_error={} model={}",
1276                            selected.config_name,
1277                            selected.upstream.base_url,
1278                            selected.index,
1279                            err_str,
1280                            model_note.as_str()
1281                        ));
1282                        // Upstream-layer retry only for classified transient transport errors.
1283                        let can_retry_upstream = upstream_attempt + 1 < upstream_opt.max_attempts
1284                            && should_retry_class(upstream_opt, Some("upstream_transport_error"));
1285                        if can_retry_upstream {
1286                            backoff_sleep(upstream_opt, upstream_attempt).await;
1287                            continue;
1288                        }
1289
1290                        lb.penalize_with_backoff(
1291                            selected.index,
1292                            plan.transport_cooldown_secs,
1293                            "upstream_transport_error",
1294                            cooldown_backoff,
1295                        );
1296                        if avoid_set.insert(selected.index) {
1297                            avoided_total = avoided_total.saturating_add(1);
1298                        }
1299                        last_err = Some((StatusCode::BAD_GATEWAY, err_str));
1300                        break;
1301                    }
1302                };
1303
1304                let upstream_headers_ms = upstream_start.elapsed().as_millis() as u64;
1305                let status = resp.status();
1306                let success = status.is_success();
1307                let resp_headers = resp.headers().clone();
1308                let resp_headers_filtered = filter_response_headers(&resp_headers);
1309
1310                if is_stream && success {
1311                    lb.record_result_with_backoff(
1312                        selected.index,
1313                        true,
1314                        crate::lb::COOLDOWN_SECS,
1315                        cooldown_backoff,
1316                    );
1317                    let retry = retry_info_for_chain(&upstream_chain);
1318                    return Ok(build_sse_success_response(
1319                        &proxy,
1320                        lb.clone(),
1321                        selected.clone(),
1322                        resp,
1323                        SseSuccessMeta {
1324                            status,
1325                            resp_headers,
1326                            resp_headers_filtered,
1327                            start,
1328                            started_at_ms,
1329                            upstream_start,
1330                            upstream_headers_ms,
1331                            request_body_len,
1332                            upstream_request_body_len,
1333                            debug_base,
1334                            retry,
1335                            session_id: session_id.clone(),
1336                            cwd: cwd.clone(),
1337                            effective_effort: effective_effort.clone(),
1338                            request_id,
1339                            is_user_turn,
1340                            is_codex_service,
1341                            transport_cooldown_secs: plan.transport_cooldown_secs,
1342                            cooldown_backoff,
1343                            method: method.clone(),
1344                            path: uri.path().to_string(),
1345                        },
1346                    )
1347                    .await);
1348                }
1349
1350                let bytes = match resp.bytes().await {
1351                    Ok(b) => b,
1352                    Err(e) => {
1353                        let err_str = format_reqwest_error_for_retry_chain(&e);
1354                        upstream_chain.push(format!(
1355                            "{}:{} (idx={}) body_read_error={} model={}",
1356                            selected.config_name,
1357                            selected.upstream.base_url,
1358                            selected.index,
1359                            err_str,
1360                            model_note.as_str()
1361                        ));
1362                        let can_retry_upstream = upstream_attempt + 1 < upstream_opt.max_attempts
1363                            && should_retry_class(upstream_opt, Some("upstream_transport_error"));
1364                        if can_retry_upstream {
1365                            backoff_sleep(upstream_opt, upstream_attempt).await;
1366                            continue;
1367                        }
1368                        lb.penalize_with_backoff(
1369                            selected.index,
1370                            plan.transport_cooldown_secs,
1371                            "upstream_body_read_error",
1372                            cooldown_backoff,
1373                        );
1374                        if avoid_set.insert(selected.index) {
1375                            avoided_total = avoided_total.saturating_add(1);
1376                        }
1377                        last_err = Some((StatusCode::BAD_GATEWAY, err_str));
1378                        break;
1379                    }
1380                };
1381
1382                let _upstream_body_read_ms = upstream_start.elapsed().as_millis() as u64;
1383                let dur = start.elapsed().as_millis() as u64;
1384
1385                let status_code = status.as_u16();
1386                let (cls, _hint, _cf_ray) =
1387                    classify_upstream_response(status_code, &resp_headers, bytes.as_ref());
1388                let never_retry = should_never_retry(&plan, status_code, cls.as_deref());
1389
1390                upstream_chain.push(format!(
1391                    "{} (idx={}) status={} class={} model={}",
1392                    selected.upstream.base_url,
1393                    selected.index,
1394                    status_code,
1395                    cls.as_deref().unwrap_or("-"),
1396                    model_note.as_str()
1397                ));
1398
1399                if success {
1400                    lb.record_result_with_backoff(
1401                        selected.index,
1402                        true,
1403                        crate::lb::COOLDOWN_SECS,
1404                        cooldown_backoff,
1405                    );
1406
1407                    let usage = extract_usage_from_bytes(&bytes);
1408                    let usage_for_log = usage.clone();
1409                    let retry = retry_info_for_chain(&upstream_chain);
1410                    let retry_for_log = retry.clone();
1411                    proxy
1412                        .state
1413                        .finish_request(crate::state::FinishRequestParams {
1414                            id: request_id,
1415                            status_code,
1416                            duration_ms: dur,
1417                            ended_at_ms: started_at_ms + dur,
1418                            usage,
1419                            retry,
1420                            ttfb_ms: Some(upstream_headers_ms),
1421                        })
1422                        .await;
1423
1424                    log_request_with_debug(
1425                        proxy.service_name,
1426                        method.as_str(),
1427                        uri.path(),
1428                        status_code,
1429                        dur,
1430                        Some(upstream_headers_ms),
1431                        &selected.config_name,
1432                        provider_id.clone(),
1433                        &selected.upstream.base_url,
1434                        session_id.clone(),
1435                        cwd.clone(),
1436                        effective_effort.clone(),
1437                        usage_for_log,
1438                        retry_for_log,
1439                        None,
1440                    );
1441
1442                    let mut builder = Response::builder().status(status);
1443                    for (name, value) in resp_headers_filtered.iter() {
1444                        builder = builder.header(name, value);
1445                    }
1446                    return Ok(builder.body(Body::from(bytes)).unwrap());
1447                }
1448
1449                if never_retry {
1450                    lb.record_result_with_backoff(
1451                        selected.index,
1452                        false,
1453                        crate::lb::COOLDOWN_SECS,
1454                        cooldown_backoff,
1455                    );
1456
1457                    let retry = retry_info_for_chain(&upstream_chain);
1458                    let retry_for_log = retry.clone();
1459                    proxy
1460                        .state
1461                        .finish_request(crate::state::FinishRequestParams {
1462                            id: request_id,
1463                            status_code,
1464                            duration_ms: dur,
1465                            ended_at_ms: started_at_ms + dur,
1466                            usage: None,
1467                            retry,
1468                            ttfb_ms: Some(upstream_headers_ms),
1469                        })
1470                        .await;
1471
1472                    log_request_with_debug(
1473                        proxy.service_name,
1474                        method.as_str(),
1475                        uri.path(),
1476                        status_code,
1477                        dur,
1478                        Some(upstream_headers_ms),
1479                        &selected.config_name,
1480                        provider_id.clone(),
1481                        &selected.upstream.base_url,
1482                        session_id.clone(),
1483                        cwd.clone(),
1484                        effective_effort.clone(),
1485                        None,
1486                        retry_for_log,
1487                        None,
1488                    );
1489
1490                    let mut builder = Response::builder().status(status);
1491                    for (name, value) in resp_headers_filtered.iter() {
1492                        builder = builder.header(name, value);
1493                    }
1494                    return Ok(builder.body(Body::from(bytes)).unwrap());
1495                }
1496
1497                let upstream_retryable = should_retry_status(upstream_opt, status_code)
1498                    || should_retry_class(upstream_opt, cls.as_deref());
1499                let can_retry_upstream =
1500                    upstream_retryable && upstream_attempt + 1 < upstream_opt.max_attempts;
1501                if can_retry_upstream {
1502                    retry_sleep(upstream_opt, upstream_attempt, &resp_headers).await;
1503                    continue;
1504                }
1505
1506                // Upstream-layer exhausted; decide whether to fail over (first within config, then to another config).
1507                let provider_retryable = should_retry_status(provider_opt, status_code)
1508                    || should_retry_class(provider_opt, cls.as_deref());
1509                if provider_retryable {
1510                    lb.penalize_with_backoff(
1511                        selected.index,
1512                        plan.transport_cooldown_secs,
1513                        &format!("status_{}", status_code),
1514                        cooldown_backoff,
1515                    );
1516                    last_err = Some((status, String::from_utf8_lossy(bytes.as_ref()).to_string()));
1517
1518                    if avoid_set.insert(selected.index) {
1519                        avoided_total = avoided_total.saturating_add(1);
1520                    }
1521                    break;
1522                }
1523
1524                // Not retryable for provider failover either: return the error as-is.
1525                let retry = retry_info_for_chain(&upstream_chain);
1526                let retry_for_log = retry.clone();
1527                proxy
1528                    .state
1529                    .finish_request(crate::state::FinishRequestParams {
1530                        id: request_id,
1531                        status_code,
1532                        duration_ms: dur,
1533                        ended_at_ms: started_at_ms + dur,
1534                        usage: None,
1535                        retry,
1536                        ttfb_ms: Some(upstream_headers_ms),
1537                    })
1538                    .await;
1539
1540                log_request_with_debug(
1541                    proxy.service_name,
1542                    method.as_str(),
1543                    uri.path(),
1544                    status_code,
1545                    dur,
1546                    Some(upstream_headers_ms),
1547                    &selected.config_name,
1548                    provider_id.clone(),
1549                    &selected.upstream.base_url,
1550                    session_id.clone(),
1551                    cwd.clone(),
1552                    effective_effort.clone(),
1553                    None,
1554                    retry_for_log,
1555                    None,
1556                );
1557
1558                let mut builder = Response::builder().status(status);
1559                for (name, value) in resp_headers_filtered.iter() {
1560                    builder = builder.header(name, value);
1561                }
1562                return Ok(builder.body(Body::from(bytes)).unwrap());
1563            }
1564
1565            // If we don't have any more upstreams under this provider/config, move to next provider;
1566            // otherwise, continue selecting another upstream under the same provider/config.
1567            let upstream_total = lb.service.upstreams.len();
1568            if upstream_total > 0 && avoid_set.len() >= upstream_total {
1569                break 'upstreams;
1570            }
1571            continue 'upstreams;
1572        }
1573
1574        tried_configs.insert(config_name);
1575
1576        // Provider layer: optional backoff between providers (usually 0).
1577        if provider_opt.base_backoff_ms > 0 {
1578            backoff_sleep(provider_opt, provider_attempt).await;
1579        }
1580    }
1581
1582    // If we reach here, all provider attempts are exhausted.
1583    if let Some((status, msg)) = last_err {
1584        let dur = start.elapsed().as_millis() as u64;
1585        log_request_with_debug(
1586            proxy.service_name,
1587            method.as_str(),
1588            uri.path(),
1589            status.as_u16(),
1590            dur,
1591            None,
1592            "-",
1593            None,
1594            "-",
1595            session_id.clone(),
1596            cwd.clone(),
1597            effective_effort.clone(),
1598            None,
1599            retry_info_for_chain(&upstream_chain),
1600            None,
1601        );
1602        let retry = retry_info_for_chain(&upstream_chain);
1603        proxy
1604            .state
1605            .finish_request(crate::state::FinishRequestParams {
1606                id: request_id,
1607                status_code: status.as_u16(),
1608                duration_ms: dur,
1609                ended_at_ms: started_at_ms + dur,
1610                usage: None,
1611                retry,
1612                ttfb_ms: None,
1613            })
1614            .await;
1615        return Err((status, msg));
1616    }
1617
1618    return Err((
1619        StatusCode::BAD_GATEWAY,
1620        "no upstreams available".to_string(),
1621    ));
1622
1623    #[cfg(any())]
1624    {
1625        for attempt_index in 0..retry_opt.max_attempts {
1626            let avoided_total = avoid.values().map(|s| s.len()).sum::<usize>();
1627            if total_upstreams > 0 && avoided_total >= total_upstreams {
1628                upstream_chain.push(format!("all_upstreams_avoided total={total_upstreams}"));
1629                break;
1630            }
1631
1632            let strict_multi_config = lbs.len() > 1;
1633
1634            let mut chosen: Option<(LoadBalancer, SelectedUpstream)> = None;
1635            for lb in &lbs {
1636                let cfg_name = lb.service.name.clone();
1637                let avoid_set = avoid.entry(cfg_name.clone()).or_default();
1638                loop {
1639                    let upstream_total = lb.service.upstreams.len();
1640                    if upstream_total > 0 && avoid_set.len() >= upstream_total {
1641                        break;
1642                    }
1643                    let next = {
1644                        let avoid_ref: &HashSet<usize> = &*avoid_set;
1645                        if strict_multi_config {
1646                            lb.select_upstream_avoiding_strict(avoid_ref)
1647                        } else {
1648                            lb.select_upstream_avoiding(avoid_ref)
1649                        }
1650                    };
1651                    let Some(selected) = next else {
1652                        break;
1653                    };
1654
1655                    if let Some(ref requested_model) = request_model {
1656                        let supported = model_routing::is_model_supported(
1657                            &selected.upstream.supported_models,
1658                            &selected.upstream.model_mapping,
1659                            requested_model,
1660                        );
1661                        if !supported {
1662                            upstream_chain.push(format!(
1663                                "{}:{} (idx={}) skipped_unsupported_model={}",
1664                                selected.config_name,
1665                                selected.upstream.base_url,
1666                                selected.index,
1667                                requested_model
1668                            ));
1669                            avoid_set.insert(selected.index);
1670                            continue;
1671                        }
1672                    }
1673
1674                    chosen = Some((lb.clone(), selected));
1675                    break;
1676                }
1677                if chosen.is_some() {
1678                    break;
1679                }
1680            }
1681
1682            // When we have multiple config candidates, prefer skipping configs that are fully cooled down.
1683            // However, if *all* configs are cooled/unavailable, fall back to the original "always pick one"
1684            // behavior to avoid a hard outage.
1685            if chosen.is_none() && strict_multi_config {
1686                for lb in &lbs {
1687                    let cfg_name = lb.service.name.clone();
1688                    let avoid_set = avoid.entry(cfg_name.clone()).or_default();
1689                    let upstream_total = lb.service.upstreams.len();
1690                    if upstream_total > 0 && avoid_set.len() >= upstream_total {
1691                        continue;
1692                    }
1693                    let avoid_ref: &HashSet<usize> = &*avoid_set;
1694                    if let Some(selected) = lb.select_upstream_avoiding(avoid_ref) {
1695                        chosen = Some((lb.clone(), selected));
1696                        break;
1697                    }
1698                }
1699            }
1700
1701            let Some((lb, selected)) = chosen else {
1702                log_retry_trace(serde_json::json!({
1703                    "event": "attempt_no_upstream",
1704                    "service": proxy.service_name,
1705                    "request_id": request_id,
1706                    "attempt": attempt_index + 1,
1707                    "max_attempts": retry_opt.max_attempts,
1708                    "avoid": avoid.iter().map(|(k, v)| serde_json::json!({
1709                        "config": k,
1710                        "indices": v.iter().copied().collect::<Vec<_>>(),
1711                    })).collect::<Vec<_>>(),
1712                    "total_upstreams": total_upstreams,
1713                    "model": request_model.clone(),
1714                }));
1715                let dur = start.elapsed().as_millis() as u64;
1716                let status = if request_model.is_some() {
1717                    StatusCode::NOT_FOUND
1718                } else {
1719                    StatusCode::BAD_GATEWAY
1720                };
1721                log_request_with_debug(
1722                    proxy.service_name,
1723                    method.as_str(),
1724                    uri.path(),
1725                    status.as_u16(),
1726                    dur,
1727                    None,
1728                    "-",
1729                    None,
1730                    "-",
1731                    session_id.clone(),
1732                    cwd.clone(),
1733                    effective_effort.clone(),
1734                    None,
1735                    retry_info_for_chain(&upstream_chain),
1736                    None,
1737                );
1738                let retry = retry_info_for_chain(&upstream_chain);
1739                proxy
1740                    .state
1741                    .finish_request(crate::state::FinishRequestParams {
1742                        id: request_id,
1743                        status_code: status.as_u16(),
1744                        duration_ms: dur,
1745                        ended_at_ms: started_at_ms + dur,
1746                        usage: None,
1747                        retry,
1748                        ttfb_ms: None,
1749                    })
1750                    .await;
1751                if let Some(model) = request_model.as_deref() {
1752                    return Err((
1753                        status,
1754                        format!("no upstreams support requested model '{model}'"),
1755                    ));
1756                }
1757                return Err((status, "no upstreams available".to_string()));
1758            };
1759
1760            let selected_config_name = selected.config_name.clone();
1761            let selected_upstream_index = selected.index;
1762            let selected_upstream_base_url = selected.upstream.base_url.clone();
1763            let provider_id = selected.upstream.tags.get("provider_id").cloned();
1764            log_retry_trace(serde_json::json!({
1765                "event": "attempt_select",
1766                "service": proxy.service_name,
1767                "request_id": request_id,
1768                "attempt": attempt_index + 1,
1769                "max_attempts": retry_opt.max_attempts,
1770                "strategy": if retry_failover { "failover" } else { "same_upstream" },
1771                "config_name": selected_config_name,
1772                "upstream_index": selected_upstream_index,
1773                "upstream_base_url": selected_upstream_base_url,
1774                "provider_id": provider_id.clone(),
1775                "model": request_model.clone(),
1776                "lb_state": lb_state_snapshot_json(&lb),
1777                "avoid_for_config": avoid.get(&selected.config_name).map(|s| s.iter().copied().collect::<Vec<_>>()),
1778                "avoided_total": avoid.values().map(|s| s.len()).sum::<usize>(),
1779                "total_upstreams": total_upstreams,
1780            }));
1781
1782            let mut model_note = "-".to_string();
1783            let mut body_for_selected = body_for_upstream.clone();
1784            if let Some(ref requested_model) = request_model {
1785                let effective_model = model_routing::effective_model(
1786                    &selected.upstream.model_mapping,
1787                    requested_model,
1788                );
1789                if effective_model != *requested_model {
1790                    if let Some(modified) =
1791                        apply_model_override(body_for_upstream.as_ref(), effective_model.as_str())
1792                    {
1793                        body_for_selected = Bytes::from(modified);
1794                    }
1795                    model_note = format!("{requested_model}->{effective_model}");
1796                } else {
1797                    model_note = requested_model.clone();
1798                }
1799            }
1800
1801            let filtered_body = proxy.filter.apply_bytes(body_for_selected);
1802            let upstream_request_body_len = filtered_body.len();
1803            let upstream_request_body_debug = if request_body_previews && debug_max > 0 {
1804                Some(make_body_preview(
1805                    &filtered_body,
1806                    client_content_type,
1807                    debug_max,
1808                ))
1809            } else {
1810                None
1811            };
1812            let upstream_request_body_warn = if request_body_previews && warn_max > 0 {
1813                Some(make_body_preview(
1814                    &filtered_body,
1815                    client_content_type,
1816                    warn_max,
1817                ))
1818            } else {
1819                None
1820            };
1821
1822            let target_url = match proxy.build_target(&selected, &uri) {
1823                Ok((url, _headers)) => url,
1824                Err(e) => {
1825                    lb.record_result_with_backoff(
1826                        selected.index,
1827                        false,
1828                        crate::lb::COOLDOWN_SECS,
1829                        cooldown_backoff,
1830                    );
1831                    let err_str = e.to_string();
1832                    upstream_chain.push(format!(
1833                        "{}:{} (idx={}) target_build_error={} model={}",
1834                        selected.config_name,
1835                        selected.upstream.base_url,
1836                        selected.index,
1837                        err_str,
1838                        model_note.as_str()
1839                    ));
1840                    avoid
1841                        .entry(selected.config_name.clone())
1842                        .or_default()
1843                        .insert(selected.index);
1844
1845                    let can_retry = attempt_index + 1 < retry_opt.max_attempts;
1846                    if can_retry {
1847                        backoff_sleep(&retry_opt, attempt_index).await;
1848                        continue;
1849                    }
1850
1851                    let dur = start.elapsed().as_millis() as u64;
1852                    let status = StatusCode::BAD_GATEWAY;
1853                    let client_headers_entries = client_headers_entries_cache
1854                        .get_or_init(|| header_map_to_entries(&client_headers))
1855                        .clone();
1856                    let http_debug = if should_include_http_warn(status.as_u16()) {
1857                        Some(HttpDebugLog {
1858                            request_body_len: Some(request_body_len),
1859                            upstream_request_body_len: Some(upstream_request_body_len),
1860                            upstream_headers_ms: None,
1861                            upstream_first_chunk_ms: None,
1862                            upstream_body_read_ms: None,
1863                            upstream_error_class: Some("target_build_error".to_string()),
1864                            upstream_error_hint: Some(
1865                                "构造上游 target_url 失败(通常是 base_url 配置错误)。"
1866                                    .to_string(),
1867                            ),
1868                            upstream_cf_ray: None,
1869                            client_uri: uri.to_string(),
1870                            target_url: "-".to_string(),
1871                            client_headers: client_headers_entries,
1872                            upstream_request_headers: Vec::new(),
1873                            auth_resolution: None,
1874                            client_body: client_body_warn.clone(),
1875                            upstream_request_body: upstream_request_body_warn.clone(),
1876                            upstream_response_headers: None,
1877                            upstream_response_body: None,
1878                            upstream_error: Some(err_str.clone()),
1879                        })
1880                    } else {
1881                        None
1882                    };
1883                    log_request_with_debug(
1884                        proxy.service_name,
1885                        method.as_str(),
1886                        uri.path(),
1887                        status.as_u16(),
1888                        dur,
1889                        None,
1890                        &selected.config_name,
1891                        selected.upstream.tags.get("provider_id").cloned(),
1892                        &selected.upstream.base_url,
1893                        session_id.clone(),
1894                        cwd.clone(),
1895                        effective_effort.clone(),
1896                        None,
1897                        retry_info_for_chain(&upstream_chain),
1898                        http_debug,
1899                    );
1900                    let retry = retry_info_for_chain(&upstream_chain);
1901                    proxy
1902                        .state
1903                        .finish_request(crate::state::FinishRequestParams {
1904                            id: request_id,
1905                            status_code: status.as_u16(),
1906                            duration_ms: dur,
1907                            ended_at_ms: started_at_ms + dur,
1908                            usage: None,
1909                            retry,
1910                            ttfb_ms: None,
1911                        })
1912                        .await;
1913                    return Err((status, err_str));
1914                }
1915            };
1916
1917            // copy headers, stripping host/content-length and hop-by-hop.
1918            // auth headers:
1919            // - if upstream config provides a token/key, override client values;
1920            // - otherwise, preserve client Authorization / X-API-Key (required for requires_openai_auth=true providers).
1921            let mut headers = filter_request_headers(&client_headers);
1922            let client_has_auth = headers.contains_key("authorization");
1923            let (token, token_src) = resolve_auth_token_with_source(
1924                proxy.service_name,
1925                &selected.upstream.auth,
1926                client_has_auth,
1927            );
1928            if let Some(token) = token
1929                && let Ok(v) = HeaderValue::from_str(&format!("Bearer {token}"))
1930            {
1931                headers.insert(HeaderName::from_static("authorization"), v);
1932            }
1933
1934            let client_has_x_api_key = headers.contains_key("x-api-key");
1935            let (api_key, api_key_src) = resolve_api_key_with_source(
1936                proxy.service_name,
1937                &selected.upstream.auth,
1938                client_has_x_api_key,
1939            );
1940            if let Some(key) = api_key
1941                && let Ok(v) = HeaderValue::from_str(&key)
1942            {
1943                headers.insert(HeaderName::from_static("x-api-key"), v);
1944            }
1945
1946            let upstream_request_headers = headers.clone();
1947            proxy
1948                .state
1949                .update_request_route(
1950                    request_id,
1951                    selected.config_name.clone(),
1952                    provider_id.clone(),
1953                    selected.upstream.base_url.clone(),
1954                )
1955                .await;
1956            let auth_resolution = AuthResolutionLog {
1957                authorization: Some(token_src),
1958                x_api_key: Some(api_key_src),
1959            };
1960
1961            let debug_base = if debug_max > 0 || warn_max > 0 {
1962                Some(HttpDebugBase {
1963                    debug_max_body_bytes: debug_max,
1964                    warn_max_body_bytes: warn_max,
1965                    request_body_len,
1966                    upstream_request_body_len,
1967                    client_uri: uri.to_string(),
1968                    target_url: target_url.to_string(),
1969                    client_headers: client_headers_entries_cache
1970                        .get_or_init(|| header_map_to_entries(&client_headers))
1971                        .clone(),
1972                    upstream_request_headers: header_map_to_entries(&upstream_request_headers),
1973                    auth_resolution: Some(auth_resolution),
1974                    client_body_debug: client_body_debug.clone(),
1975                    upstream_request_body_debug: upstream_request_body_debug.clone(),
1976                    client_body_warn: client_body_warn.clone(),
1977                    upstream_request_body_warn: upstream_request_body_warn.clone(),
1978                })
1979            } else {
1980                None
1981            };
1982
1983            // 详细转发日志仅在 debug 级别输出,避免刷屏。
1984            tracing::debug!(
1985                "forwarding {} {} to {} ({})",
1986                method,
1987                uri.path(),
1988                target_url,
1989                selected.config_name
1990            );
1991
1992            let builder = proxy
1993                .client
1994                .request(method.clone(), target_url.clone())
1995                .headers(headers)
1996                .body(filtered_body.clone());
1997
1998            let upstream_start = Instant::now();
1999            let resp = match builder.send().await {
2000                Ok(r) => r,
2001                Err(e) => {
2002                    log_retry_trace(serde_json::json!({
2003                        "event": "attempt_transport_error",
2004                        "service": proxy.service_name,
2005                        "request_id": request_id,
2006                        "attempt": attempt_index + 1,
2007                        "max_attempts": retry_opt.max_attempts,
2008                        "strategy": if retry_failover { "failover" } else { "same_upstream" },
2009                        "config_name": selected.config_name.as_str(),
2010                        "upstream_index": selected.index,
2011                        "upstream_base_url": selected.upstream.base_url.as_str(),
2012                        "provider_id": provider_id.as_deref(),
2013                        "error": format_reqwest_error_for_retry_chain(&e),
2014                    }));
2015                    if retry_failover {
2016                        lb.record_result_with_backoff(
2017                            selected.index,
2018                            false,
2019                            crate::lb::COOLDOWN_SECS,
2020                            cooldown_backoff,
2021                        );
2022                    }
2023                    let err_str = format_reqwest_error_for_retry_chain(&e);
2024                    upstream_chain.push(format!(
2025                        "{}:{} (idx={}) transport_error={} model={}",
2026                        selected.config_name,
2027                        selected.upstream.base_url,
2028                        selected.index,
2029                        err_str,
2030                        model_note.as_str()
2031                    ));
2032                    let can_retry = attempt_index + 1 < retry_opt.max_attempts
2033                        && should_retry_class(&retry_opt, Some("upstream_transport_error"));
2034                    if can_retry {
2035                        if retry_failover {
2036                            lb.penalize_with_backoff(
2037                                selected.index,
2038                                retry_opt.transport_cooldown_secs,
2039                                "upstream_transport_error",
2040                                cooldown_backoff,
2041                            );
2042                            avoid
2043                                .entry(selected.config_name.clone())
2044                                .or_default()
2045                                .insert(selected.index);
2046                        }
2047                        backoff_sleep(&retry_opt, attempt_index).await;
2048                        continue;
2049                    }
2050
2051                    // Even when we have no remaining in-request retries, mark this upstream as cooled down
2052                    // so external retries (e.g. Codex request_max_retries) can fail over to another upstream.
2053                    if should_retry_class(&retry_opt, Some("upstream_transport_error")) {
2054                        lb.penalize_with_backoff(
2055                            selected.index,
2056                            retry_opt.transport_cooldown_secs,
2057                            "upstream_transport_error_final",
2058                            cooldown_backoff,
2059                        );
2060                    }
2061
2062                    let dur = start.elapsed().as_millis() as u64;
2063                    let status_code = StatusCode::BAD_GATEWAY.as_u16();
2064                    let upstream_headers_ms = upstream_start.elapsed().as_millis() as u64;
2065                    let retry = retry_info_for_chain(&upstream_chain);
2066                    let http_debug_warn = debug_base.as_ref().and_then(|b| {
2067                    if b.warn_max_body_bytes == 0 {
2068                        return None;
2069                    }
2070                    Some(HttpDebugLog {
2071                        request_body_len: Some(b.request_body_len),
2072                        upstream_request_body_len: Some(b.upstream_request_body_len),
2073                        upstream_headers_ms: Some(upstream_headers_ms),
2074                        upstream_first_chunk_ms: None,
2075                        upstream_body_read_ms: None,
2076                        upstream_error_class: Some("upstream_transport_error".to_string()),
2077                        upstream_error_hint: Some(
2078                            "上游连接/发送请求失败(reqwest 错误);请检查网络、DNS、TLS、代理设置或上游可用性。".to_string(),
2079                        ),
2080                        upstream_cf_ray: None,
2081                        client_uri: b.client_uri.clone(),
2082                        target_url: b.target_url.clone(),
2083                        client_headers: b.client_headers.clone(),
2084                        upstream_request_headers: b.upstream_request_headers.clone(),
2085                        auth_resolution: b.auth_resolution.clone(),
2086                        client_body: b.client_body_warn.clone(),
2087                        upstream_request_body: b.upstream_request_body_warn.clone(),
2088                        upstream_response_headers: None,
2089                        upstream_response_body: None,
2090                        upstream_error: Some(err_str.clone()),
2091                    })
2092                });
2093                    if should_include_http_warn(status_code)
2094                        && let Some(h) = http_debug_warn.as_ref()
2095                    {
2096                        warn_http_debug(status_code, h);
2097                    }
2098                    let http_debug = if should_include_http_debug(status_code) {
2099                        debug_base.as_ref().and_then(|b| {
2100                        if b.debug_max_body_bytes == 0 {
2101                            return None;
2102                        }
2103                        Some(HttpDebugLog {
2104                            request_body_len: Some(b.request_body_len),
2105                            upstream_request_body_len: Some(b.upstream_request_body_len),
2106                            upstream_headers_ms: Some(upstream_headers_ms),
2107                            upstream_first_chunk_ms: None,
2108                            upstream_body_read_ms: None,
2109                            upstream_error_class: Some("upstream_transport_error".to_string()),
2110                            upstream_error_hint: Some(
2111                                "上游连接/发送请求失败(reqwest 错误);请检查网络、DNS、TLS、代理设置或上游可用性。".to_string(),
2112                            ),
2113                            upstream_cf_ray: None,
2114                            client_uri: b.client_uri.clone(),
2115                            target_url: b.target_url.clone(),
2116                            client_headers: b.client_headers.clone(),
2117                            upstream_request_headers: b.upstream_request_headers.clone(),
2118                            auth_resolution: b.auth_resolution.clone(),
2119                            client_body: b.client_body_debug.clone(),
2120                            upstream_request_body: b.upstream_request_body_debug.clone(),
2121                            upstream_response_headers: None,
2122                            upstream_response_body: None,
2123                            upstream_error: Some(err_str.clone()),
2124                        })
2125                    })
2126                    } else if should_include_http_warn(status_code) {
2127                        http_debug_warn.clone()
2128                    } else {
2129                        None
2130                    };
2131                    log_request_with_debug(
2132                        proxy.service_name,
2133                        method.as_str(),
2134                        uri.path(),
2135                        status_code,
2136                        dur,
2137                        None,
2138                        &selected.config_name,
2139                        selected.upstream.tags.get("provider_id").cloned(),
2140                        &selected.upstream.base_url,
2141                        session_id.clone(),
2142                        cwd.clone(),
2143                        effective_effort.clone(),
2144                        None,
2145                        retry.clone(),
2146                        http_debug,
2147                    );
2148                    proxy
2149                        .state
2150                        .finish_request(crate::state::FinishRequestParams {
2151                            id: request_id,
2152                            status_code,
2153                            duration_ms: dur,
2154                            ended_at_ms: started_at_ms + dur,
2155                            usage: None,
2156                            retry,
2157                            ttfb_ms: None,
2158                        })
2159                        .await;
2160                    return Err((StatusCode::BAD_GATEWAY, e.to_string()));
2161                }
2162            };
2163
2164            let upstream_headers_ms = upstream_start.elapsed().as_millis() as u64;
2165            let status = resp.status();
2166            let success = status.is_success();
2167            let resp_headers = resp.headers().clone();
2168            let resp_headers_filtered = filter_response_headers(&resp_headers);
2169
2170            // 对用户对话轮次输出更有信息量的 info 日志(仅最终返回时打印,避免重试期间刷屏)。
2171
2172            if is_stream && success {
2173                lb.record_result_with_backoff(
2174                    selected.index,
2175                    true,
2176                    crate::lb::COOLDOWN_SECS,
2177                    cooldown_backoff,
2178                );
2179                upstream_chain.push(format!(
2180                    "{} (idx={}) status={} model={}",
2181                    selected.upstream.base_url,
2182                    selected.index,
2183                    status.as_u16(),
2184                    model_note.as_str()
2185                ));
2186                let retry = retry_info_for_chain(&upstream_chain);
2187
2188                return Ok(build_sse_success_response(
2189                    &proxy,
2190                    lb.clone(),
2191                    selected,
2192                    resp,
2193                    SseSuccessMeta {
2194                        status,
2195                        resp_headers,
2196                        resp_headers_filtered,
2197                        start,
2198                        started_at_ms,
2199                        upstream_start,
2200                        upstream_headers_ms,
2201                        request_body_len,
2202                        upstream_request_body_len,
2203                        debug_base,
2204                        retry,
2205                        session_id: session_id.clone(),
2206                        cwd: cwd.clone(),
2207                        effective_effort: effective_effort.clone(),
2208                        request_id,
2209                        is_user_turn,
2210                        is_codex_service,
2211                        transport_cooldown_secs: retry_opt.transport_cooldown_secs,
2212                        cooldown_backoff,
2213                        method: method.clone(),
2214                        path: uri.path().to_string(),
2215                    },
2216                )
2217                .await);
2218            } else {
2219                let bytes = match resp.bytes().await {
2220                    Ok(b) => b,
2221                    Err(e) => {
2222                        log_retry_trace(serde_json::json!({
2223                            "event": "attempt_body_read_error",
2224                            "service": proxy.service_name,
2225                            "request_id": request_id,
2226                            "attempt": attempt_index + 1,
2227                            "max_attempts": retry_opt.max_attempts,
2228                            "strategy": if retry_failover { "failover" } else { "same_upstream" },
2229                            "config_name": selected.config_name.as_str(),
2230                            "upstream_index": selected.index,
2231                            "upstream_base_url": selected.upstream.base_url.as_str(),
2232                            "provider_id": provider_id.as_deref(),
2233                            "error": format_reqwest_error_for_retry_chain(&e),
2234                        }));
2235                        if retry_failover {
2236                            lb.record_result_with_backoff(
2237                                selected.index,
2238                                false,
2239                                crate::lb::COOLDOWN_SECS,
2240                                cooldown_backoff,
2241                            );
2242                        }
2243                        let err_str = format_reqwest_error_for_retry_chain(&e);
2244                        upstream_chain.push(format!(
2245                            "{}:{} (idx={}) body_read_error={} model={}",
2246                            selected.config_name,
2247                            selected.upstream.base_url,
2248                            selected.index,
2249                            err_str,
2250                            model_note.as_str()
2251                        ));
2252                        let can_retry = attempt_index + 1 < retry_opt.max_attempts
2253                            && should_retry_class(&retry_opt, Some("upstream_transport_error"));
2254                        if can_retry {
2255                            if retry_failover {
2256                                lb.penalize_with_backoff(
2257                                    selected.index,
2258                                    retry_opt.transport_cooldown_secs,
2259                                    "upstream_body_read_error",
2260                                    cooldown_backoff,
2261                                );
2262                                avoid
2263                                    .entry(selected.config_name.clone())
2264                                    .or_default()
2265                                    .insert(selected.index);
2266                            }
2267                            backoff_sleep(&retry_opt, attempt_index).await;
2268                            continue;
2269                        }
2270
2271                        // Same reasoning as transport errors: without in-request retries, external retries
2272                        // should not get stuck repeatedly selecting the same broken upstream.
2273                        if should_retry_class(&retry_opt, Some("upstream_transport_error")) {
2274                            lb.penalize_with_backoff(
2275                                selected.index,
2276                                retry_opt.transport_cooldown_secs,
2277                                "upstream_body_read_error_final",
2278                                cooldown_backoff,
2279                            );
2280                        }
2281
2282                        let dur = start.elapsed().as_millis() as u64;
2283                        let status = StatusCode::BAD_GATEWAY;
2284                        let http_debug = if should_include_http_warn(status.as_u16())
2285                            && let Some(b) = debug_base.as_ref()
2286                        {
2287                            Some(HttpDebugLog {
2288                            request_body_len: Some(b.request_body_len),
2289                            upstream_request_body_len: Some(b.upstream_request_body_len),
2290                            upstream_headers_ms: Some(upstream_headers_ms),
2291                            upstream_first_chunk_ms: None,
2292                            upstream_body_read_ms: None,
2293                            upstream_error_class: Some("upstream_transport_error".to_string()),
2294                            upstream_error_hint: Some(
2295                                "读取上游响应 body 失败(连接中断/解码错误等);可视为传输错误。"
2296                                    .to_string(),
2297                            ),
2298                            upstream_cf_ray: None,
2299                            client_uri: b.client_uri.clone(),
2300                            target_url: b.target_url.clone(),
2301                            client_headers: b.client_headers.clone(),
2302                            upstream_request_headers: b.upstream_request_headers.clone(),
2303                            auth_resolution: b.auth_resolution.clone(),
2304                            client_body: b.client_body_warn.clone(),
2305                            upstream_request_body: b.upstream_request_body_warn.clone(),
2306                            upstream_response_headers: Some(header_map_to_entries(&resp_headers)),
2307                            upstream_response_body: None,
2308                            upstream_error: Some(err_str.clone()),
2309                        })
2310                        } else {
2311                            None
2312                        };
2313                        log_request_with_debug(
2314                            proxy.service_name,
2315                            method.as_str(),
2316                            uri.path(),
2317                            status.as_u16(),
2318                            dur,
2319                            Some(upstream_headers_ms),
2320                            &selected.config_name,
2321                            selected.upstream.tags.get("provider_id").cloned(),
2322                            &selected.upstream.base_url,
2323                            session_id.clone(),
2324                            cwd.clone(),
2325                            effective_effort.clone(),
2326                            None,
2327                            retry_info_for_chain(&upstream_chain),
2328                            http_debug,
2329                        );
2330                        let retry = retry_info_for_chain(&upstream_chain);
2331                        proxy
2332                            .state
2333                            .finish_request(crate::state::FinishRequestParams {
2334                                id: request_id,
2335                                status_code: status.as_u16(),
2336                                duration_ms: dur,
2337                                ended_at_ms: started_at_ms + dur,
2338                                usage: None,
2339                                retry,
2340                                ttfb_ms: Some(upstream_headers_ms),
2341                            })
2342                            .await;
2343                        return Err((status, err_str));
2344                    }
2345                };
2346                let upstream_body_read_ms = upstream_start.elapsed().as_millis() as u64;
2347                let dur = start.elapsed().as_millis() as u64;
2348                let usage = extract_usage_from_bytes(&bytes);
2349                let status_code = status.as_u16();
2350                let (cls, hint, cf_ray) =
2351                    classify_upstream_response(status_code, &resp_headers, bytes.as_ref());
2352                let never_retry = should_never_retry_status(&retry_opt, status_code)
2353                    || should_never_retry_class(&retry_opt, cls.as_deref());
2354
2355                upstream_chain.push(format!(
2356                    "{} (idx={}) status={} class={} model={}",
2357                    selected.upstream.base_url,
2358                    selected.index,
2359                    status_code,
2360                    cls.as_deref().unwrap_or("-"),
2361                    model_note.as_str()
2362                ));
2363
2364                // If this looks like a transient / retryable upstream failure, but we have no remaining
2365                // in-request retries, proactively cool down this upstream so the next external retry
2366                // (Codex/app-level) can fail over to other upstreams.
2367                if !success
2368                    && attempt_index + 1 >= retry_opt.max_attempts
2369                    && !never_retry
2370                    && (should_retry_status(&retry_opt, status_code)
2371                        || should_retry_class(&retry_opt, cls.as_deref()))
2372                {
2373                    log_retry_trace(serde_json::json!({
2374                        "event": "attempt_final_retryable_failure",
2375                        "service": proxy.service_name,
2376                        "request_id": request_id,
2377                        "attempt": attempt_index + 1,
2378                        "max_attempts": retry_opt.max_attempts,
2379                        "status_code": status_code,
2380                        "class": cls.as_deref(),
2381                        "hint": hint.as_deref(),
2382                        "cf_ray": cf_ray.as_deref(),
2383                        "config_name": selected.config_name.as_str(),
2384                        "upstream_index": selected.index,
2385                        "upstream_base_url": selected.upstream.base_url.as_str(),
2386                        "provider_id": provider_id.as_deref(),
2387                        "should_retry_status": should_retry_status(&retry_opt, status_code),
2388                        "should_retry_class": should_retry_class(&retry_opt, cls.as_deref()),
2389                        "never_retry_status": should_never_retry_status(&retry_opt, status_code),
2390                        "never_retry_class": should_never_retry_class(&retry_opt, cls.as_deref()),
2391                    }));
2392                    match cls.as_deref() {
2393                        Some("cloudflare_challenge") => lb.penalize_with_backoff(
2394                            selected.index,
2395                            retry_opt.cloudflare_challenge_cooldown_secs,
2396                            "cloudflare_challenge_final",
2397                            cooldown_backoff,
2398                        ),
2399                        Some("cloudflare_timeout") => lb.penalize_with_backoff(
2400                            selected.index,
2401                            retry_opt.cloudflare_timeout_cooldown_secs,
2402                            "cloudflare_timeout_final",
2403                            cooldown_backoff,
2404                        ),
2405                        _ if status_code >= 400 => lb.penalize_with_backoff(
2406                            selected.index,
2407                            retry_opt.transport_cooldown_secs,
2408                            &format!("status_{}_final", status_code),
2409                            cooldown_backoff,
2410                        ),
2411                        _ => {}
2412                    }
2413                }
2414
2415                let retryable = !status.is_success()
2416                    && attempt_index + 1 < retry_opt.max_attempts
2417                    && !never_retry
2418                    && (should_retry_status(&retry_opt, status_code)
2419                        || should_retry_class(&retry_opt, cls.as_deref()));
2420                if retryable {
2421                    log_retry_trace(serde_json::json!({
2422                        "event": "attempt_retryable_failure",
2423                        "service": proxy.service_name,
2424                        "request_id": request_id,
2425                        "attempt": attempt_index + 1,
2426                        "next_attempt": attempt_index + 2,
2427                        "max_attempts": retry_opt.max_attempts,
2428                        "status_code": status_code,
2429                        "class": cls.as_deref(),
2430                        "hint": hint.as_deref(),
2431                        "cf_ray": cf_ray.as_deref(),
2432                        "config_name": selected.config_name.as_str(),
2433                        "upstream_index": selected.index,
2434                        "upstream_base_url": selected.upstream.base_url.as_str(),
2435                        "provider_id": provider_id.as_deref(),
2436                        "strategy": if retry_failover { "failover" } else { "same_upstream" },
2437                        "retry_after": resp_headers.get("retry-after").and_then(|v| v.to_str().ok()),
2438                        "should_retry_status": should_retry_status(&retry_opt, status_code),
2439                        "should_retry_class": should_retry_class(&retry_opt, cls.as_deref()),
2440                        "never_retry_status": should_never_retry_status(&retry_opt, status_code),
2441                        "never_retry_class": should_never_retry_class(&retry_opt, cls.as_deref()),
2442                    }));
2443                    let cls_s = cls.as_deref().unwrap_or("-");
2444                    info!(
2445                        "retrying after non-2xx status {} (class={}) for {} {} (config: {}, mode={}, next_attempt={}/{})",
2446                        status_code,
2447                        cls_s,
2448                        method,
2449                        uri.path(),
2450                        selected.config_name,
2451                        if retry_failover {
2452                            "failover"
2453                        } else {
2454                            "same_upstream"
2455                        },
2456                        attempt_index + 2,
2457                        retry_opt.max_attempts
2458                    );
2459                    if retry_failover {
2460                        // Treat retryable 5xx / WAF-like responses as upstream failures for LB tracking.
2461                        if status_code >= 500 || cls.is_some() {
2462                            lb.record_result_with_backoff(
2463                                selected.index,
2464                                false,
2465                                crate::lb::COOLDOWN_SECS,
2466                                cooldown_backoff,
2467                            );
2468                        }
2469                        match cls.as_deref() {
2470                            Some("cloudflare_challenge") => lb.penalize_with_backoff(
2471                                selected.index,
2472                                retry_opt.cloudflare_challenge_cooldown_secs,
2473                                "cloudflare_challenge",
2474                                cooldown_backoff,
2475                            ),
2476                            Some("cloudflare_timeout") => lb.penalize_with_backoff(
2477                                selected.index,
2478                                retry_opt.cloudflare_timeout_cooldown_secs,
2479                                "cloudflare_timeout",
2480                                cooldown_backoff,
2481                            ),
2482                            _ if status_code >= 400 => lb.penalize_with_backoff(
2483                                selected.index,
2484                                retry_opt.transport_cooldown_secs,
2485                                &format!("status_{}", status_code),
2486                                cooldown_backoff,
2487                            ),
2488                            _ => {}
2489                        }
2490                        avoid
2491                            .entry(selected.config_name.clone())
2492                            .or_default()
2493                            .insert(selected.index);
2494                    }
2495                    let skip_sleep = status_code >= 400 && status_code < 500 && status_code != 429;
2496                    if !skip_sleep {
2497                        retry_sleep(&retry_opt, attempt_index, &resp_headers).await;
2498                    }
2499                    continue;
2500                }
2501
2502                // Update LB state (final attempt):
2503                // - 2xx => success
2504                // - transport / 5xx / classified WAF failures => failure
2505                // - generic 3xx/4xx => neutral (do not mark upstream good/bad to avoid sticky routing to a failing upstream,
2506                //   and also avoid penalizing upstreams for client-side mistakes).
2507                if success {
2508                    lb.record_result_with_backoff(
2509                        selected.index,
2510                        true,
2511                        crate::lb::COOLDOWN_SECS,
2512                        cooldown_backoff,
2513                    );
2514                } else if status_code >= 500 || cls.is_some() {
2515                    lb.record_result_with_backoff(
2516                        selected.index,
2517                        false,
2518                        crate::lb::COOLDOWN_SECS,
2519                        cooldown_backoff,
2520                    );
2521                }
2522
2523                let retry = retry_info_for_chain(&upstream_chain);
2524
2525                if is_user_turn {
2526                    let provider_id = selected
2527                        .upstream
2528                        .tags
2529                        .get("provider_id")
2530                        .map(|s| s.as_str())
2531                        .unwrap_or("-");
2532                    info!(
2533                        "user turn {} {} using config '{}' upstream[{}] provider_id='{}' base_url='{}'",
2534                        method,
2535                        uri.path(),
2536                        selected.config_name,
2537                        selected.index,
2538                        provider_id,
2539                        selected.upstream.base_url
2540                    );
2541                }
2542
2543                let http_debug_warn = if should_include_http_warn(status_code)
2544                    && let Some(b) = debug_base.as_ref()
2545                {
2546                    let max = b.warn_max_body_bytes;
2547                    let resp_ct = resp_headers
2548                        .get("content-type")
2549                        .and_then(|v| v.to_str().ok());
2550                    Some(HttpDebugLog {
2551                        request_body_len: Some(b.request_body_len),
2552                        upstream_request_body_len: Some(b.upstream_request_body_len),
2553                        upstream_headers_ms: Some(upstream_headers_ms),
2554                        upstream_first_chunk_ms: None,
2555                        upstream_body_read_ms: Some(upstream_body_read_ms),
2556                        upstream_error_class: cls.clone(),
2557                        upstream_error_hint: hint.clone(),
2558                        upstream_cf_ray: cf_ray.clone(),
2559                        client_uri: b.client_uri.clone(),
2560                        target_url: b.target_url.clone(),
2561                        client_headers: b.client_headers.clone(),
2562                        upstream_request_headers: b.upstream_request_headers.clone(),
2563                        auth_resolution: b.auth_resolution.clone(),
2564                        client_body: b.client_body_warn.clone(),
2565                        upstream_request_body: b.upstream_request_body_warn.clone(),
2566                        upstream_response_headers: Some(header_map_to_entries(&resp_headers)),
2567                        upstream_response_body: Some(make_body_preview(
2568                            bytes.as_ref(),
2569                            resp_ct,
2570                            max,
2571                        )),
2572                        upstream_error: None,
2573                    })
2574                } else {
2575                    None
2576                };
2577
2578                if !status.is_success() {
2579                    if let Some(h) = http_debug_warn.as_ref() {
2580                        warn_http_debug(status_code, h);
2581                    } else {
2582                        let cls_s = cls.as_deref().unwrap_or("-");
2583                        let cf_ray_s = cf_ray.as_deref().unwrap_or("-");
2584                        warn!(
2585                            "upstream returned non-2xx status {} (class={}, cf_ray={}) for {} {} (config: {}); set CODEX_HELPER_HTTP_WARN=0 to disable preview logs (or CODEX_HELPER_HTTP_DEBUG=1 for full debug)",
2586                            status_code,
2587                            cls_s,
2588                            cf_ray_s,
2589                            method,
2590                            uri.path(),
2591                            selected.config_name
2592                        );
2593                    }
2594                }
2595
2596                let http_debug = if should_include_http_debug(status_code) {
2597                    debug_base.map(|b| {
2598                        let max = b.debug_max_body_bytes;
2599                        let resp_ct = resp_headers
2600                            .get("content-type")
2601                            .and_then(|v| v.to_str().ok());
2602                        HttpDebugLog {
2603                            request_body_len: Some(b.request_body_len),
2604                            upstream_request_body_len: Some(b.upstream_request_body_len),
2605                            upstream_headers_ms: Some(upstream_headers_ms),
2606                            upstream_first_chunk_ms: None,
2607                            upstream_body_read_ms: Some(upstream_body_read_ms),
2608                            upstream_error_class: cls,
2609                            upstream_error_hint: hint,
2610                            upstream_cf_ray: cf_ray,
2611                            client_uri: b.client_uri,
2612                            target_url: b.target_url,
2613                            client_headers: b.client_headers,
2614                            upstream_request_headers: b.upstream_request_headers,
2615                            auth_resolution: b.auth_resolution,
2616                            client_body: b.client_body_debug,
2617                            upstream_request_body: b.upstream_request_body_debug,
2618                            upstream_response_headers: Some(header_map_to_entries(&resp_headers)),
2619                            upstream_response_body: Some(make_body_preview(
2620                                bytes.as_ref(),
2621                                resp_ct,
2622                                max,
2623                            )),
2624                            upstream_error: None,
2625                        }
2626                    })
2627                } else if should_include_http_warn(status_code) {
2628                    http_debug_warn.clone()
2629                } else {
2630                    None
2631                };
2632
2633                log_request_with_debug(
2634                    proxy.service_name,
2635                    method.as_str(),
2636                    uri.path(),
2637                    status_code,
2638                    dur,
2639                    Some(upstream_headers_ms),
2640                    &selected.config_name,
2641                    selected.upstream.tags.get("provider_id").cloned(),
2642                    &selected.upstream.base_url,
2643                    session_id.clone(),
2644                    cwd.clone(),
2645                    effective_effort.clone(),
2646                    usage.clone(),
2647                    retry.clone(),
2648                    http_debug,
2649                );
2650                proxy
2651                    .state
2652                    .finish_request(crate::state::FinishRequestParams {
2653                        id: request_id,
2654                        status_code,
2655                        duration_ms: dur,
2656                        ended_at_ms: started_at_ms + dur,
2657                        usage: usage.clone(),
2658                        retry,
2659                        ttfb_ms: Some(upstream_headers_ms),
2660                    })
2661                    .await;
2662
2663                // Poll usage once after a user request finishes (e.g. packycode), used to drive auto-switching.
2664                if is_user_turn && is_codex_service {
2665                    usage_providers::poll_for_codex_upstream(
2666                        cfg_snapshot.clone(),
2667                        proxy.lb_states.clone(),
2668                        &selected.config_name,
2669                        selected.index,
2670                    )
2671                    .await;
2672                }
2673
2674                let mut builder = Response::builder().status(status);
2675                for (name, value) in resp_headers_filtered.iter() {
2676                    builder = builder.header(name, value);
2677                }
2678                return Ok(builder.body(Body::from(bytes)).unwrap());
2679            }
2680        }
2681
2682        let dur = start.elapsed().as_millis() as u64;
2683        let status = StatusCode::BAD_GATEWAY;
2684        let http_debug = if should_include_http_warn(status.as_u16()) {
2685            let client_headers_entries = client_headers_entries_cache
2686                .get_or_init(|| header_map_to_entries(&client_headers))
2687                .clone();
2688            Some(HttpDebugLog {
2689                request_body_len: Some(request_body_len),
2690                upstream_request_body_len: None,
2691                upstream_headers_ms: None,
2692                upstream_first_chunk_ms: None,
2693                upstream_body_read_ms: None,
2694                upstream_error_class: Some("retry_exhausted".to_string()),
2695                upstream_error_hint: Some("所有重试尝试均未能返回可用响应。".to_string()),
2696                upstream_cf_ray: None,
2697                client_uri: uri.to_string(),
2698                target_url: "-".to_string(),
2699                client_headers: client_headers_entries,
2700                upstream_request_headers: Vec::new(),
2701                auth_resolution: None,
2702                client_body: client_body_warn.clone(),
2703                upstream_request_body: None,
2704                upstream_response_headers: None,
2705                upstream_response_body: None,
2706                upstream_error: Some(format!(
2707                    "retry attempts exhausted; chain={:?}",
2708                    upstream_chain
2709                )),
2710            })
2711        } else {
2712            None
2713        };
2714        log_request_with_debug(
2715            proxy.service_name,
2716            method.as_str(),
2717            uri.path(),
2718            status.as_u16(),
2719            dur,
2720            None,
2721            "-",
2722            None,
2723            "-",
2724            session_id.clone(),
2725            cwd.clone(),
2726            effective_effort.clone(),
2727            None,
2728            retry_info_for_chain(&upstream_chain),
2729            http_debug,
2730        );
2731        let retry = retry_info_for_chain(&upstream_chain);
2732        proxy
2733            .state
2734            .finish_request(crate::state::FinishRequestParams {
2735                id: request_id,
2736                status_code: status.as_u16(),
2737                duration_ms: dur,
2738                ended_at_ms: started_at_ms + dur,
2739                usage: None,
2740                retry,
2741                ttfb_ms: None,
2742            })
2743            .await;
2744        Err((status, "retry attempts exhausted".to_string()))
2745    }
2746}
2747
2748pub fn router(proxy: ProxyService) -> Router {
2749    // In axum 0.8, wildcard segments use `/{*path}` (equivalent to `/*path` from axum 0.7).
2750    #[derive(serde::Deserialize)]
2751    struct SessionOverrideRequest {
2752        session_id: String,
2753        effort: Option<String>,
2754    }
2755
2756    #[derive(serde::Deserialize)]
2757    struct SessionConfigOverrideRequest {
2758        session_id: String,
2759        config_name: Option<String>,
2760    }
2761
2762    #[derive(serde::Deserialize)]
2763    struct GlobalConfigOverrideRequest {
2764        config_name: Option<String>,
2765    }
2766
2767    #[derive(serde::Serialize)]
2768    struct RuntimeConfigStatus {
2769        config_path: String,
2770        loaded_at_ms: u64,
2771        source_mtime_ms: Option<u64>,
2772        retry: crate::config::ResolvedRetryConfig,
2773    }
2774
2775    #[derive(serde::Serialize)]
2776    struct ApiCapabilities {
2777        api_version: u32,
2778        service_name: &'static str,
2779        endpoints: Vec<&'static str>,
2780    }
2781
2782    #[derive(serde::Serialize)]
2783    struct ReloadResult {
2784        reloaded: bool,
2785        status: RuntimeConfigStatus,
2786    }
2787
2788    async fn runtime_config_status(
2789        proxy: ProxyService,
2790    ) -> Result<Json<RuntimeConfigStatus>, (StatusCode, String)> {
2791        let cfg = proxy.config.snapshot().await;
2792        Ok(Json(RuntimeConfigStatus {
2793            config_path: crate::config::config_file_path().display().to_string(),
2794            loaded_at_ms: proxy.config.last_loaded_at_ms(),
2795            source_mtime_ms: proxy.config.last_mtime_ms().await,
2796            retry: cfg.retry.resolve(),
2797        }))
2798    }
2799
2800    async fn reload_runtime_config(
2801        proxy: ProxyService,
2802    ) -> Result<Json<ReloadResult>, (StatusCode, String)> {
2803        let changed = proxy
2804            .config
2805            .force_reload_from_disk()
2806            .await
2807            .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2808        let cfg = proxy.config.snapshot().await;
2809        Ok(Json(ReloadResult {
2810            reloaded: changed,
2811            status: RuntimeConfigStatus {
2812                config_path: crate::config::config_file_path().display().to_string(),
2813                loaded_at_ms: proxy.config.last_loaded_at_ms(),
2814                source_mtime_ms: proxy.config.last_mtime_ms().await,
2815                retry: cfg.retry.resolve(),
2816            },
2817        }))
2818    }
2819
2820    async fn set_session_override(
2821        proxy: ProxyService,
2822        Json(payload): Json<SessionOverrideRequest>,
2823    ) -> Result<StatusCode, (StatusCode, String)> {
2824        if payload.session_id.trim().is_empty() {
2825            return Err((
2826                StatusCode::BAD_REQUEST,
2827                "session_id is required".to_string(),
2828            ));
2829        }
2830        if let Some(effort) = payload.effort {
2831            if effort.trim().is_empty() {
2832                return Err((StatusCode::BAD_REQUEST, "effort is empty".to_string()));
2833            }
2834            proxy
2835                .state
2836                .set_session_effort_override(
2837                    payload.session_id,
2838                    effort,
2839                    std::time::SystemTime::now()
2840                        .duration_since(std::time::UNIX_EPOCH)
2841                        .map(|d| d.as_millis() as u64)
2842                        .unwrap_or(0),
2843                )
2844                .await;
2845        } else {
2846            proxy
2847                .state
2848                .clear_session_effort_override(payload.session_id.as_str())
2849                .await;
2850        }
2851        Ok(StatusCode::NO_CONTENT)
2852    }
2853
2854    async fn list_session_overrides(
2855        proxy: ProxyService,
2856    ) -> Result<Json<std::collections::HashMap<String, String>>, (StatusCode, String)> {
2857        let map = proxy.state.list_session_effort_overrides().await;
2858        Ok(Json(map))
2859    }
2860
2861    async fn set_session_config_override(
2862        proxy: ProxyService,
2863        Json(payload): Json<SessionConfigOverrideRequest>,
2864    ) -> Result<StatusCode, (StatusCode, String)> {
2865        if payload.session_id.trim().is_empty() {
2866            return Err((
2867                StatusCode::BAD_REQUEST,
2868                "session_id is required".to_string(),
2869            ));
2870        }
2871        if let Some(config_name) = payload.config_name {
2872            if config_name.trim().is_empty() {
2873                return Err((StatusCode::BAD_REQUEST, "config_name is empty".to_string()));
2874            }
2875            proxy
2876                .state
2877                .set_session_config_override(
2878                    payload.session_id,
2879                    config_name,
2880                    std::time::SystemTime::now()
2881                        .duration_since(std::time::UNIX_EPOCH)
2882                        .map(|d| d.as_millis() as u64)
2883                        .unwrap_or(0),
2884                )
2885                .await;
2886        } else {
2887            proxy
2888                .state
2889                .clear_session_config_override(payload.session_id.as_str())
2890                .await;
2891        }
2892        Ok(StatusCode::NO_CONTENT)
2893    }
2894
2895    async fn list_session_config_overrides(
2896        proxy: ProxyService,
2897    ) -> Result<Json<std::collections::HashMap<String, String>>, (StatusCode, String)> {
2898        let map = proxy.state.list_session_config_overrides().await;
2899        Ok(Json(map))
2900    }
2901
2902    async fn get_global_config_override(
2903        proxy: ProxyService,
2904    ) -> Result<Json<Option<String>>, (StatusCode, String)> {
2905        Ok(Json(proxy.state.get_global_config_override().await))
2906    }
2907
2908    async fn set_global_config_override(
2909        proxy: ProxyService,
2910        Json(payload): Json<GlobalConfigOverrideRequest>,
2911    ) -> Result<StatusCode, (StatusCode, String)> {
2912        if let Some(config_name) = payload.config_name {
2913            if config_name.trim().is_empty() {
2914                return Err((StatusCode::BAD_REQUEST, "config_name is empty".to_string()));
2915            }
2916            proxy
2917                .state
2918                .set_global_config_override(
2919                    config_name,
2920                    std::time::SystemTime::now()
2921                        .duration_since(std::time::UNIX_EPOCH)
2922                        .map(|d| d.as_millis() as u64)
2923                        .unwrap_or(0),
2924                )
2925                .await;
2926        } else {
2927            proxy.state.clear_global_config_override().await;
2928        }
2929        Ok(StatusCode::NO_CONTENT)
2930    }
2931
2932    async fn list_active_requests(
2933        proxy: ProxyService,
2934    ) -> Result<Json<Vec<ActiveRequest>>, (StatusCode, String)> {
2935        let vec = proxy.state.list_active_requests().await;
2936        Ok(Json(vec))
2937    }
2938
2939    async fn list_session_stats(
2940        proxy: ProxyService,
2941    ) -> Result<
2942        Json<std::collections::HashMap<String, crate::state::SessionStats>>,
2943        (StatusCode, String),
2944    > {
2945        let map = proxy.state.list_session_stats().await;
2946        Ok(Json(map))
2947    }
2948
2949    #[derive(serde::Deserialize)]
2950    struct RecentQuery {
2951        limit: Option<usize>,
2952    }
2953
2954    async fn list_recent_finished(
2955        proxy: ProxyService,
2956        Query(q): Query<RecentQuery>,
2957    ) -> Result<Json<Vec<FinishedRequest>>, (StatusCode, String)> {
2958        let limit = q.limit.unwrap_or(50).clamp(1, 200);
2959        let vec = proxy.state.list_recent_finished(limit).await;
2960        Ok(Json(vec))
2961    }
2962
2963    async fn api_capabilities(
2964        proxy: ProxyService,
2965    ) -> Result<Json<ApiCapabilities>, (StatusCode, String)> {
2966        Ok(Json(ApiCapabilities {
2967            api_version: 1,
2968            service_name: proxy.service_name,
2969            endpoints: vec![
2970                "/__codex_helper/api/v1/capabilities",
2971                "/__codex_helper/api/v1/snapshot",
2972                "/__codex_helper/api/v1/status/active",
2973                "/__codex_helper/api/v1/status/recent",
2974                "/__codex_helper/api/v1/status/session-stats",
2975                "/__codex_helper/api/v1/status/health-checks",
2976                "/__codex_helper/api/v1/status/config-health",
2977                "/__codex_helper/api/v1/config/runtime",
2978                "/__codex_helper/api/v1/config/reload",
2979                "/__codex_helper/api/v1/configs",
2980                "/__codex_helper/api/v1/overrides/session/effort",
2981                "/__codex_helper/api/v1/overrides/session/config",
2982                "/__codex_helper/api/v1/overrides/global-config",
2983                "/__codex_helper/api/v1/healthcheck/start",
2984                "/__codex_helper/api/v1/healthcheck/cancel",
2985            ],
2986        }))
2987    }
2988
2989    #[derive(serde::Deserialize)]
2990    struct SnapshotQuery {
2991        recent_limit: Option<usize>,
2992        stats_days: Option<usize>,
2993    }
2994
2995    async fn api_v1_snapshot(
2996        proxy: ProxyService,
2997        Query(q): Query<SnapshotQuery>,
2998    ) -> Result<Json<crate::dashboard_core::ApiV1Snapshot>, (StatusCode, String)> {
2999        let recent_limit = q.recent_limit.unwrap_or(200).clamp(1, 2_000);
3000        let stats_days = q.stats_days.unwrap_or(21).clamp(1, 365);
3001
3002        let cfg = proxy.config.snapshot().await;
3003        let mgr = match proxy.service_name {
3004            "claude" => &cfg.claude,
3005            _ => &cfg.codex,
3006        };
3007        let mut configs = mgr
3008            .configs
3009            .iter()
3010            .map(|(name, c)| crate::dashboard_core::ConfigOption {
3011                name: name.clone(),
3012                alias: c.alias.clone(),
3013                enabled: c.enabled,
3014                level: c.level.clamp(1, 10),
3015            })
3016            .collect::<Vec<_>>();
3017        configs.sort_by(|a, b| a.level.cmp(&b.level).then_with(|| a.name.cmp(&b.name)));
3018
3019        let snapshot = crate::dashboard_core::build_dashboard_snapshot(
3020            &proxy.state,
3021            proxy.service_name,
3022            recent_limit,
3023            stats_days,
3024        )
3025        .await;
3026
3027        Ok(Json(crate::dashboard_core::ApiV1Snapshot {
3028            api_version: 1,
3029            service_name: proxy.service_name.to_string(),
3030            runtime_loaded_at_ms: Some(proxy.config.last_loaded_at_ms()),
3031            runtime_source_mtime_ms: proxy.config.last_mtime_ms().await,
3032            configs,
3033            snapshot,
3034        }))
3035    }
3036
3037    async fn list_health_checks(
3038        proxy: ProxyService,
3039    ) -> Result<
3040        Json<std::collections::HashMap<String, crate::state::HealthCheckStatus>>,
3041        (StatusCode, String),
3042    > {
3043        let map = proxy.state.list_health_checks(proxy.service_name).await;
3044        Ok(Json(map))
3045    }
3046
3047    async fn list_config_health(
3048        proxy: ProxyService,
3049    ) -> Result<
3050        Json<std::collections::HashMap<String, crate::state::ConfigHealth>>,
3051        (StatusCode, String),
3052    > {
3053        let map = proxy.state.get_config_health(proxy.service_name).await;
3054        Ok(Json(map))
3055    }
3056
3057    #[derive(Debug, serde::Deserialize)]
3058    struct HealthCheckAction {
3059        #[serde(default)]
3060        all: bool,
3061        #[serde(default)]
3062        config_names: Vec<String>,
3063    }
3064
3065    #[derive(Debug, serde::Serialize)]
3066    struct HealthCheckActionResult {
3067        started: Vec<String>,
3068        already_running: Vec<String>,
3069        missing: Vec<String>,
3070        cancel_requested: Vec<String>,
3071        not_running: Vec<String>,
3072    }
3073
3074    fn now_ms() -> u64 {
3075        std::time::SystemTime::now()
3076            .duration_since(std::time::UNIX_EPOCH)
3077            .map(|d| d.as_millis() as u64)
3078            .unwrap_or(0)
3079    }
3080
3081    async fn start_health_checks(
3082        proxy: ProxyService,
3083        Json(payload): Json<HealthCheckAction>,
3084    ) -> Result<Json<HealthCheckActionResult>, (StatusCode, String)> {
3085        let cfg = proxy.config.snapshot().await;
3086        let mgr = match proxy.service_name {
3087            "claude" => &cfg.claude,
3088            _ => &cfg.codex,
3089        };
3090
3091        let mut targets = if payload.all {
3092            mgr.configs.keys().cloned().collect::<Vec<_>>()
3093        } else {
3094            payload.config_names
3095        };
3096        targets.retain(|s| !s.trim().is_empty());
3097        targets.sort();
3098        targets.dedup();
3099        if targets.is_empty() {
3100            return Err((
3101                StatusCode::BAD_REQUEST,
3102                "expected { all: true } or non-empty config_names".to_string(),
3103            ));
3104        }
3105
3106        let mut started = Vec::new();
3107        let mut already_running = Vec::new();
3108        let mut missing = Vec::new();
3109        for name in targets {
3110            let Some(svc) = mgr.configs.get(&name) else {
3111                missing.push(name);
3112                continue;
3113            };
3114
3115            let upstreams = svc.upstreams.clone();
3116            let now = now_ms();
3117            if !proxy
3118                .state
3119                .try_begin_health_check(proxy.service_name, &name, upstreams.len(), now)
3120                .await
3121            {
3122                already_running.push(name);
3123                continue;
3124            }
3125
3126            proxy
3127                .state
3128                .record_config_health(
3129                    proxy.service_name,
3130                    name.clone(),
3131                    crate::state::ConfigHealth {
3132                        checked_at_ms: now,
3133                        upstreams: Vec::new(),
3134                    },
3135                )
3136                .await;
3137
3138            let state = proxy.state.clone();
3139            let service_name = proxy.service_name;
3140            let config_name = name.clone();
3141            tokio::spawn(async move {
3142                crate::healthcheck::run_health_check_for_config(
3143                    state,
3144                    service_name,
3145                    config_name,
3146                    upstreams,
3147                )
3148                .await;
3149            });
3150            started.push(name);
3151        }
3152
3153        Ok(Json(HealthCheckActionResult {
3154            started,
3155            already_running,
3156            missing,
3157            cancel_requested: Vec::new(),
3158            not_running: Vec::new(),
3159        }))
3160    }
3161
3162    async fn cancel_health_checks(
3163        proxy: ProxyService,
3164        Json(payload): Json<HealthCheckAction>,
3165    ) -> Result<Json<HealthCheckActionResult>, (StatusCode, String)> {
3166        let cfg = proxy.config.snapshot().await;
3167        let mgr = match proxy.service_name {
3168            "claude" => &cfg.claude,
3169            _ => &cfg.codex,
3170        };
3171
3172        let mut targets = if payload.all {
3173            mgr.configs.keys().cloned().collect::<Vec<_>>()
3174        } else {
3175            payload.config_names
3176        };
3177        targets.retain(|s| !s.trim().is_empty());
3178        targets.sort();
3179        targets.dedup();
3180        if targets.is_empty() {
3181            return Err((
3182                StatusCode::BAD_REQUEST,
3183                "expected { all: true } or non-empty config_names".to_string(),
3184            ));
3185        }
3186
3187        let now = now_ms();
3188        let mut cancel_requested = Vec::new();
3189        let mut not_running = Vec::new();
3190        let mut missing = Vec::new();
3191        for name in targets {
3192            if !mgr.configs.contains_key(&name) {
3193                missing.push(name);
3194                continue;
3195            }
3196            let ok = proxy
3197                .state
3198                .request_cancel_health_check(proxy.service_name, &name, now)
3199                .await;
3200            if ok {
3201                cancel_requested.push(name);
3202            } else {
3203                not_running.push(name);
3204            }
3205        }
3206
3207        Ok(Json(HealthCheckActionResult {
3208            started: Vec::new(),
3209            already_running: Vec::new(),
3210            missing,
3211            cancel_requested,
3212            not_running,
3213        }))
3214    }
3215
3216    async fn list_configs(
3217        proxy: ProxyService,
3218    ) -> Result<Json<Vec<crate::dashboard_core::ConfigOption>>, (StatusCode, String)> {
3219        let cfg = proxy.config.snapshot().await;
3220        let mgr = match proxy.service_name {
3221            "claude" => &cfg.claude,
3222            _ => &cfg.codex,
3223        };
3224        let mut out = mgr
3225            .configs
3226            .iter()
3227            .map(|(name, c)| crate::dashboard_core::ConfigOption {
3228                name: name.clone(),
3229                alias: c.alias.clone(),
3230                enabled: c.enabled,
3231                level: c.level.clamp(1, 10),
3232            })
3233            .collect::<Vec<_>>();
3234        out.sort_by(|a, b| a.level.cmp(&b.level).then_with(|| a.name.cmp(&b.name)));
3235        Ok(Json(out))
3236    }
3237
3238    let p0 = proxy.clone();
3239    let p1 = proxy.clone();
3240    let p2 = proxy.clone();
3241    let p3 = proxy.clone();
3242    let p4 = proxy.clone();
3243    let p5 = proxy.clone();
3244    let p6 = proxy.clone();
3245    let p7 = proxy.clone();
3246    let p8 = proxy.clone();
3247    let p9 = proxy.clone();
3248    let p10 = proxy.clone();
3249    let p11 = proxy.clone();
3250    let p12 = proxy.clone();
3251    let p13 = proxy.clone();
3252    let p14 = proxy.clone();
3253    let p15 = proxy.clone();
3254    let p16 = proxy.clone();
3255    let p17 = proxy.clone();
3256    let p18 = proxy.clone();
3257    let p19 = proxy.clone();
3258    let p20 = proxy.clone();
3259    let p21 = proxy.clone();
3260    let p22 = proxy.clone();
3261    let p23 = proxy.clone();
3262    let p24 = proxy.clone();
3263    let p25 = proxy.clone();
3264
3265    Router::new()
3266        // Versioned API (v1): attach-friendly, safe-by-default (no secrets).
3267        .route(
3268            "/__codex_helper/api/v1/capabilities",
3269            get(move || api_capabilities(p8.clone())),
3270        )
3271        .route(
3272            "/__codex_helper/api/v1/snapshot",
3273            get(move |q| api_v1_snapshot(p25.clone(), q)),
3274        )
3275        .route(
3276            "/__codex_helper/api/v1/status/active",
3277            get(move || list_active_requests(p9.clone())),
3278        )
3279        .route(
3280            "/__codex_helper/api/v1/status/recent",
3281            get(move |q| list_recent_finished(p10.clone(), q)),
3282        )
3283        .route(
3284            "/__codex_helper/api/v1/status/session-stats",
3285            get(move || list_session_stats(p11.clone())),
3286        )
3287        .route(
3288            "/__codex_helper/api/v1/status/health-checks",
3289            get(move || list_health_checks(p21.clone())),
3290        )
3291        .route(
3292            "/__codex_helper/api/v1/status/config-health",
3293            get(move || list_config_health(p22.clone())),
3294        )
3295        .route(
3296            "/__codex_helper/api/v1/config/runtime",
3297            get(move || runtime_config_status(p12.clone())),
3298        )
3299        .route(
3300            "/__codex_helper/api/v1/config/reload",
3301            post(move || reload_runtime_config(p13.clone())),
3302        )
3303        .route(
3304            "/__codex_helper/api/v1/configs",
3305            get(move || list_configs(p14.clone())),
3306        )
3307        .route(
3308            "/__codex_helper/api/v1/overrides/session/effort",
3309            get(move || list_session_overrides(p15.clone()))
3310                .post(move |payload| set_session_override(p16.clone(), payload)),
3311        )
3312        .route(
3313            "/__codex_helper/api/v1/overrides/session/config",
3314            get(move || list_session_config_overrides(p17.clone()))
3315                .post(move |payload| set_session_config_override(p18.clone(), payload)),
3316        )
3317        .route(
3318            "/__codex_helper/api/v1/overrides/global-config",
3319            get(move || get_global_config_override(p19.clone()))
3320                .post(move |payload| set_global_config_override(p20.clone(), payload)),
3321        )
3322        .route(
3323            "/__codex_helper/api/v1/healthcheck/start",
3324            post(move |payload| start_health_checks(p23.clone(), payload)),
3325        )
3326        .route(
3327            "/__codex_helper/api/v1/healthcheck/cancel",
3328            post(move |payload| cancel_health_checks(p24.clone(), payload)),
3329        )
3330        .route(
3331            "/__codex_helper/override/session",
3332            get(move || list_session_overrides(p0.clone()))
3333                .post(move |payload| set_session_override(p1.clone(), payload)),
3334        )
3335        .route(
3336            "/__codex_helper/config/runtime",
3337            get(move || runtime_config_status(p5.clone())),
3338        )
3339        .route(
3340            "/__codex_helper/config/reload",
3341            get(move || runtime_config_status(p6.clone()))
3342                .post(move || reload_runtime_config(p7.clone())),
3343        )
3344        .route(
3345            "/__codex_helper/status/active",
3346            get(move || list_active_requests(p3.clone())),
3347        )
3348        .route(
3349            "/__codex_helper/status/recent",
3350            get(move |q| list_recent_finished(p4.clone(), q)),
3351        )
3352        .route("/{*path}", any(move |req| handle_proxy(p2.clone(), req)))
3353}