1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex};
3use std::time::Instant;
4
5use anyhow::{Result, anyhow};
6use axum::Json;
7use axum::Router;
8use axum::body::{Body, Bytes, to_bytes};
9use axum::extract::Query;
10use axum::http::{HeaderMap, HeaderName, HeaderValue, Method, Request, Response, StatusCode, Uri};
11use axum::routing::{any, get, post};
12use reqwest::Client;
13use std::sync::OnceLock;
14use tracing::{instrument, warn};
15
16mod classify;
17mod retry;
18mod runtime_config;
19mod stream;
20#[cfg(test)]
21mod tests;
22
23use crate::config::{ProxyConfig, RetryStrategy, ServiceConfigManager};
24use crate::filter::RequestFilter;
25use crate::lb::{LbState, LoadBalancer, SelectedUpstream};
26use crate::logging::{
27 AuthResolutionLog, BodyPreview, HeaderEntry, HttpDebugLog, http_debug_options,
28 http_warn_options, log_request_with_debug, log_retry_trace, make_body_preview,
29 should_include_http_warn, should_log_request_body_preview,
30};
31use crate::model_routing;
32use crate::state::{ActiveRequest, FinishedRequest, ProxyState};
33use crate::usage::extract_usage_from_bytes;
34
35use self::classify::classify_upstream_response;
36use self::retry::{
37 backoff_sleep, retry_info_for_chain, retry_plan, retry_sleep, should_never_retry,
38 should_retry_class, should_retry_status,
39};
40use self::runtime_config::RuntimeConfig;
41use self::stream::{SseSuccessMeta, build_sse_success_response};
42
43fn format_reqwest_error_for_retry_chain(e: &reqwest::Error) -> String {
44 use std::error::Error as _;
45
46 let mut parts: Vec<String> = Vec::new();
47 let first = e.to_string();
48 if !first.trim().is_empty() {
49 parts.push(first);
50 }
51
52 let mut cur = e.source();
53 for _ in 0..4 {
54 let Some(src) = cur else { break };
55 let msg = src.to_string();
56 if !msg.trim().is_empty() && !parts.iter().any(|x| x == &msg) {
57 parts.push(msg);
58 }
59 cur = src.source();
60 }
61
62 let mut flags: Vec<&'static str> = Vec::new();
63 if e.is_timeout() {
64 flags.push("timeout");
65 }
66 if e.is_connect() {
67 flags.push("connect");
68 }
69
70 let mut out = if parts.is_empty() {
71 "reqwest error".to_string()
72 } else {
73 parts.join(" | caused_by: ")
74 };
75 if !flags.is_empty() {
76 out.push_str(" (flags: ");
77 out.push_str(&flags.join(","));
78 out.push(')');
79 }
80 out = out.replace(['\r', '\n'], " ");
81 const MAX_LEN: usize = 360;
82 if out.len() > MAX_LEN {
83 out.truncate(MAX_LEN);
84 out.push('…');
85 }
86 out
87}
88
89#[allow(dead_code)]
90fn lb_state_snapshot_json(lb: &LoadBalancer) -> Option<serde_json::Value> {
91 let map = match lb.states.lock() {
92 Ok(m) => m,
93 Err(e) => e.into_inner(),
94 };
95 let st = map.get(&lb.service.name)?;
96 let now = std::time::Instant::now();
97 let upstreams = (0..lb.service.upstreams.len())
98 .map(|idx| {
99 let cooldown_remaining_ms = st
100 .cooldown_until
101 .get(idx)
102 .and_then(|x| *x)
103 .map(|until| until.saturating_duration_since(now).as_millis() as u64)
104 .filter(|&ms| ms > 0);
105 serde_json::json!({
106 "idx": idx,
107 "failure_count": st.failure_counts.get(idx).copied(),
108 "penalty_streak": st.penalty_streak.get(idx).copied(),
109 "usage_exhausted": st.usage_exhausted.get(idx).copied(),
110 "cooldown_remaining_ms": cooldown_remaining_ms,
111 })
112 })
113 .collect::<Vec<_>>();
114 Some(serde_json::json!({
115 "last_good_index": st.last_good_index,
116 "upstreams": upstreams,
117 }))
118}
119
120fn read_json_file(path: &std::path::Path) -> Option<serde_json::Value> {
121 let bytes = std::fs::read(path).ok()?;
122 let text = String::from_utf8_lossy(&bytes);
123 if text.trim().is_empty() {
124 return None;
125 }
126 serde_json::from_str(&text).ok()
127}
128
129fn codex_auth_json_value(key: &str) -> Option<String> {
130 static CACHE: std::sync::OnceLock<Option<serde_json::Value>> = std::sync::OnceLock::new();
131 let v = CACHE.get_or_init(|| read_json_file(&crate::config::codex_auth_path()));
132 let obj = v.as_ref()?.as_object()?;
133 obj.get(key).and_then(|x| x.as_str()).map(|s| s.to_string())
134}
135
136fn claude_settings_env_value(key: &str) -> Option<String> {
137 static CACHE: std::sync::OnceLock<Option<serde_json::Value>> = std::sync::OnceLock::new();
138 let v = CACHE.get_or_init(|| read_json_file(&crate::config::claude_settings_path()));
139 let obj = v.as_ref()?.as_object()?;
140 let env_obj = obj.get("env")?.as_object()?;
141 env_obj
142 .get(key)
143 .and_then(|x| x.as_str())
144 .map(|s| s.to_string())
145}
146
147fn resolve_auth_token_with_source(
148 service_name: &str,
149 auth: &crate::config::UpstreamAuth,
150 client_has_auth: bool,
151) -> (Option<String>, String) {
152 if let Some(token) = auth.auth_token.as_deref()
153 && !token.trim().is_empty()
154 {
155 return (Some(token.to_string()), "inline".to_string());
156 }
157
158 if let Some(env_name) = auth.auth_token_env.as_deref()
159 && !env_name.trim().is_empty()
160 {
161 if let Ok(v) = std::env::var(env_name)
162 && !v.trim().is_empty()
163 {
164 return (Some(v), format!("env:{env_name}"));
165 }
166
167 let file_value = match service_name {
168 "codex" => codex_auth_json_value(env_name),
169 "claude" => claude_settings_env_value(env_name),
170 _ => None,
171 };
172 if let Some(v) = file_value
173 && !v.trim().is_empty()
174 {
175 let src = match service_name {
176 "codex" => format!("codex_auth_json:{env_name}"),
177 "claude" => format!("claude_settings_env:{env_name}"),
178 _ => format!("file:{env_name}"),
179 };
180 return (Some(v), src);
181 }
182
183 if client_has_auth {
184 return (None, format!("client_passthrough (missing_env:{env_name})"));
185 }
186 return (None, format!("missing_env:{env_name}"));
187 }
188
189 if client_has_auth {
190 (None, "client_passthrough".to_string())
191 } else {
192 (None, "none".to_string())
193 }
194}
195
196fn resolve_api_key_with_source(
197 service_name: &str,
198 auth: &crate::config::UpstreamAuth,
199 client_has_x_api_key: bool,
200) -> (Option<String>, String) {
201 if let Some(key) = auth.api_key.as_deref()
202 && !key.trim().is_empty()
203 {
204 return (Some(key.to_string()), "inline".to_string());
205 }
206
207 if let Some(env_name) = auth.api_key_env.as_deref()
208 && !env_name.trim().is_empty()
209 {
210 if let Ok(v) = std::env::var(env_name)
211 && !v.trim().is_empty()
212 {
213 return (Some(v), format!("env:{env_name}"));
214 }
215
216 let file_value = match service_name {
217 "codex" => codex_auth_json_value(env_name),
218 "claude" => claude_settings_env_value(env_name),
219 _ => None,
220 };
221 if let Some(v) = file_value
222 && !v.trim().is_empty()
223 {
224 let src = match service_name {
225 "codex" => format!("codex_auth_json:{env_name}"),
226 "claude" => format!("claude_settings_env:{env_name}"),
227 _ => format!("file:{env_name}"),
228 };
229 return (Some(v), src);
230 }
231
232 if client_has_x_api_key {
233 return (None, format!("client_passthrough (missing_env:{env_name})"));
234 }
235 return (None, format!("missing_env:{env_name}"));
236 }
237
238 if client_has_x_api_key {
239 (None, "client_passthrough".to_string())
240 } else {
241 (None, "none".to_string())
242 }
243}
244
245fn is_hop_by_hop_header(name_lower: &str) -> bool {
246 matches!(
247 name_lower,
248 "connection"
249 | "keep-alive"
250 | "proxy-authenticate"
251 | "proxy-authorization"
252 | "te"
253 | "trailer"
254 | "trailers"
255 | "transfer-encoding"
256 | "upgrade"
257 )
258}
259
260fn hop_by_hop_connection_tokens(headers: &HeaderMap) -> Vec<String> {
261 let mut out = Vec::new();
262 for value in headers.get_all("connection").iter() {
263 let Ok(s) = value.to_str() else {
264 continue;
265 };
266 for token in s.split(',').map(|t| t.trim()).filter(|t| !t.is_empty()) {
267 out.push(token.to_ascii_lowercase());
268 }
269 }
270 out
271}
272
273fn filter_request_headers(src: &HeaderMap) -> HeaderMap {
274 let extra = hop_by_hop_connection_tokens(src);
275 let mut out = HeaderMap::new();
276 for (name, value) in src.iter() {
277 let name_lower = name.as_str().to_ascii_lowercase();
278 if name_lower == "host"
279 || name_lower == "content-length"
280 || is_hop_by_hop_header(&name_lower)
281 {
282 continue;
283 }
284 if extra.iter().any(|t| t == &name_lower) {
285 continue;
286 }
287 out.append(name.clone(), value.clone());
288 }
289 out
290}
291
292fn filter_response_headers(src: &HeaderMap) -> HeaderMap {
293 let extra = hop_by_hop_connection_tokens(src);
294 let mut out = HeaderMap::new();
295 for (name, value) in src.iter() {
296 let name_lower = name.as_str().to_ascii_lowercase();
297 if is_hop_by_hop_header(&name_lower)
299 || name_lower == "content-length"
300 || name_lower == "content-encoding"
301 {
302 continue;
303 }
304 if extra.iter().any(|t| t == &name_lower) {
305 continue;
306 }
307 out.append(name.clone(), value.clone());
308 }
309 out
310}
311
312fn header_map_to_entries(headers: &HeaderMap) -> Vec<HeaderEntry> {
313 fn is_sensitive(name_lower: &str) -> bool {
314 matches!(
315 name_lower,
316 "authorization"
317 | "proxy-authorization"
318 | "cookie"
319 | "set-cookie"
320 | "x-api-key"
321 | "x-forwarded-api-key"
322 | "x-goog-api-key"
323 )
324 }
325
326 let mut out = Vec::new();
327 for (name, value) in headers.iter() {
328 let name_lower = name.as_str().to_ascii_lowercase();
329 let v = if is_sensitive(name_lower.as_str()) {
330 "[REDACTED]".to_string()
331 } else {
332 String::from_utf8_lossy(value.as_bytes()).into_owned()
333 };
334 out.push(HeaderEntry {
335 name: name.as_str().to_string(),
336 value: v,
337 });
338 }
339 out
340}
341
342#[derive(Clone)]
343struct HttpDebugBase {
344 debug_max_body_bytes: usize,
345 warn_max_body_bytes: usize,
346 #[allow(dead_code)]
347 request_body_len: usize,
348 #[allow(dead_code)]
349 upstream_request_body_len: usize,
350 client_uri: String,
351 target_url: String,
352 client_headers: Vec<HeaderEntry>,
353 upstream_request_headers: Vec<HeaderEntry>,
354 auth_resolution: Option<AuthResolutionLog>,
355 client_body_debug: Option<BodyPreview>,
356 upstream_request_body_debug: Option<BodyPreview>,
357 client_body_warn: Option<BodyPreview>,
358 upstream_request_body_warn: Option<BodyPreview>,
359}
360
361fn warn_http_debug(status_code: u16, http_debug: &HttpDebugLog) {
362 let max_chars = 2048usize;
363 let Ok(mut json) = serde_json::to_string(http_debug) else {
364 return;
365 };
366 if json.chars().count() > max_chars {
367 json = json.chars().take(max_chars).collect::<String>() + "...[TRUNCATED_FOR_LOG]";
368 }
369 warn!("upstream non-2xx http_debug={json} status_code={status_code}");
370}
371
372#[derive(Clone)]
374pub struct ProxyService {
375 pub client: Client,
376 config: Arc<RuntimeConfig>,
377 pub service_name: &'static str,
378 lb_states: Arc<Mutex<HashMap<String, LbState>>>,
379 filter: RequestFilter,
380 state: Arc<ProxyState>,
381}
382
383impl ProxyService {
384 pub fn new(
385 client: Client,
386 config: Arc<ProxyConfig>,
387 service_name: &'static str,
388 lb_states: Arc<Mutex<HashMap<String, LbState>>>,
389 ) -> Self {
390 let state = ProxyState::new_with_lb_states(Some(lb_states.clone()));
391 ProxyState::spawn_cleanup_task(state.clone());
392 if !cfg!(test) {
393 let state = state.clone();
394 let log_path = crate::config::proxy_home_dir()
395 .join("logs")
396 .join("requests.jsonl");
397 let mut base_url_to_provider_id = HashMap::new();
398 let mgr = match service_name {
399 "claude" => &config.claude,
400 _ => &config.codex,
401 };
402 for svc in mgr.configs.values() {
403 for up in &svc.upstreams {
404 if let Some(pid) = up.tags.get("provider_id") {
405 base_url_to_provider_id.insert(up.base_url.clone(), pid.clone());
406 }
407 }
408 }
409 tokio::spawn(async move {
410 let _ = state
411 .replay_usage_from_requests_log(service_name, log_path, base_url_to_provider_id)
412 .await;
413 });
414 }
415 Self {
416 client,
417 config: Arc::new(RuntimeConfig::new(config)),
418 service_name,
419 lb_states,
420 filter: RequestFilter::new(),
421 state,
422 }
423 }
424
425 fn service_manager<'a>(&self, cfg: &'a ProxyConfig) -> &'a ServiceConfigManager {
426 match self.service_name {
427 "codex" => &cfg.codex,
428 "claude" => &cfg.claude,
429 _ => &cfg.codex,
430 }
431 }
432
433 async fn pinned_config(&self, session_id: Option<&str>) -> Option<(String, &'static str)> {
434 if let Some(sid) = session_id
435 && let Some(name) = self.state.get_session_config_override(sid).await
436 && !name.trim().is_empty()
437 {
438 return Some((name, "session"));
439 }
440 if let Some(name) = self.state.get_global_config_override().await
441 && !name.trim().is_empty()
442 {
443 return Some((name, "global"));
444 }
445 None
446 }
447
448 async fn lbs_for_request(
449 &self,
450 cfg: &ProxyConfig,
451 session_id: Option<&str>,
452 ) -> Vec<LoadBalancer> {
453 let mgr = self.service_manager(cfg);
454 let meta_overrides = self
455 .state
456 .get_config_meta_overrides(self.service_name)
457 .await;
458 if let Some((name, source)) = self.pinned_config(session_id).await {
459 if let Some(svc) = mgr
460 .configs
461 .get(&name)
462 .or_else(|| mgr.active_config())
463 .cloned()
464 {
465 log_retry_trace(serde_json::json!({
466 "event": "lbs_for_request",
467 "service": self.service_name,
468 "session_id": session_id,
469 "mode": "pinned",
470 "pinned_source": source,
471 "pinned_name": name,
472 "selected_config": svc.name,
473 "selected_level": svc.level.clamp(1, 10),
474 "selected_upstreams": svc.upstreams.len(),
475 "active_config": mgr.active.as_deref(),
476 "config_count": mgr.configs.len(),
477 }));
478 return vec![LoadBalancer::new(Arc::new(svc), self.lb_states.clone())];
479 }
480 log_retry_trace(serde_json::json!({
481 "event": "lbs_for_request",
482 "service": self.service_name,
483 "session_id": session_id,
484 "mode": "pinned",
485 "pinned_source": source,
486 "pinned_name": name,
487 "selected_config": null,
488 "active_config": mgr.active.as_deref(),
489 "config_count": mgr.configs.len(),
490 "note": "pinned_config_not_found",
491 }));
492 return Vec::new();
493 }
494
495 let active_name = mgr.active.as_deref();
496 let mut configs = mgr
497 .configs
498 .iter()
499 .filter(|(name, svc)| {
500 let (enabled_ovr, _) = meta_overrides
501 .get(name.as_str())
502 .copied()
503 .unwrap_or((None, None));
504 let enabled = enabled_ovr.unwrap_or(svc.enabled);
505 !svc.upstreams.is_empty()
506 && (enabled || active_name.is_some_and(|n| n == name.as_str()))
507 })
508 .collect::<Vec<_>>();
509
510 let has_multi_level = {
511 let mut levels = configs
512 .iter()
513 .map(|(name, svc)| {
514 let (_, level_ovr) = meta_overrides
515 .get(name.as_str())
516 .copied()
517 .unwrap_or((None, None));
518 level_ovr.unwrap_or(svc.level).clamp(1, 10)
519 })
520 .collect::<Vec<_>>();
521 levels.sort_unstable();
522 levels.dedup();
523 levels.len() > 1
524 };
525
526 if !has_multi_level {
527 let eligible_details = || {
528 configs
529 .iter()
530 .map(|(name, svc)| {
531 let (_, level_ovr) = meta_overrides
532 .get(name.as_str())
533 .copied()
534 .unwrap_or((None, None));
535 serde_json::json!({
536 "name": (*name).clone(),
537 "level": level_ovr.unwrap_or(svc.level).clamp(1, 10),
538 "enabled": svc.enabled,
539 "upstreams": svc.upstreams.len(),
540 })
541 })
542 .collect::<Vec<_>>()
543 };
544
545 let mut ordered = configs
546 .iter()
547 .map(|(name, svc)| ((*name).clone(), (*svc).clone()))
548 .collect::<Vec<_>>();
549 ordered.sort_by(|(a, _), (b, _)| a.cmp(b));
550 if let Some(active) = active_name
551 && let Some(pos) = ordered.iter().position(|(n, _)| n == active)
552 {
553 let item = ordered.remove(pos);
554 ordered.insert(0, item);
555 }
556
557 let lbs = ordered
558 .into_iter()
559 .map(|(_, svc)| LoadBalancer::new(Arc::new(svc), self.lb_states.clone()))
560 .collect::<Vec<_>>();
561 if !lbs.is_empty() {
562 log_retry_trace(serde_json::json!({
563 "event": "lbs_for_request",
564 "service": self.service_name,
565 "session_id": session_id,
566 "mode": "single_level_multi",
567 "active_config": active_name,
568 "selected_configs": lbs.iter().map(|lb| lb.service.name.clone()).collect::<Vec<_>>(),
569 "eligible_configs": configs.iter().map(|(n, _)| (*n).clone()).collect::<Vec<_>>(),
570 "eligible_details": eligible_details(),
571 "eligible_count": configs.len(),
572 }));
573 return lbs;
574 }
575
576 if let Some(svc) = mgr.active_config().cloned() {
577 log_retry_trace(serde_json::json!({
578 "event": "lbs_for_request",
579 "service": self.service_name,
580 "session_id": session_id,
581 "mode": "single_level_fallback_active_config",
582 "active_config": active_name,
583 "selected_config": svc.name,
584 "selected_level": svc.level.clamp(1, 10),
585 "selected_upstreams": svc.upstreams.len(),
586 "eligible_configs": configs.iter().map(|(n, _)| (*n).clone()).collect::<Vec<_>>(),
587 "eligible_details": eligible_details(),
588 "eligible_count": configs.len(),
589 }));
590 return vec![LoadBalancer::new(Arc::new(svc), self.lb_states.clone())];
591 }
592
593 log_retry_trace(serde_json::json!({
594 "event": "lbs_for_request",
595 "service": self.service_name,
596 "session_id": session_id,
597 "mode": "single_level_empty",
598 "active_config": active_name,
599 "eligible_configs": configs.iter().map(|(n, _)| (*n).clone()).collect::<Vec<_>>(),
600 "eligible_details": eligible_details(),
601 "eligible_count": configs.len(),
602 }));
603 return Vec::new();
604 }
605
606 configs.sort_by(|(a_name, a), (b_name, b)| {
607 let a_level = meta_overrides
608 .get(a_name.as_str())
609 .and_then(|(_, l)| *l)
610 .unwrap_or(a.level)
611 .clamp(1, 10);
612 let b_level = meta_overrides
613 .get(b_name.as_str())
614 .and_then(|(_, l)| *l)
615 .unwrap_or(b.level)
616 .clamp(1, 10);
617 let a_active = active_name.is_some_and(|n| n == a_name.as_str());
618 let b_active = active_name.is_some_and(|n| n == b_name.as_str());
619 a_level
620 .cmp(&b_level)
621 .then_with(|| b_active.cmp(&a_active))
622 .then_with(|| a_name.cmp(b_name))
623 });
624
625 let lbs = configs
626 .into_iter()
627 .map(|(_, svc)| LoadBalancer::new(Arc::new(svc.clone()), self.lb_states.clone()))
628 .collect::<Vec<_>>();
629 if !lbs.is_empty() {
630 log_retry_trace(serde_json::json!({
631 "event": "lbs_for_request",
632 "service": self.service_name,
633 "session_id": session_id,
634 "mode": "multi_level",
635 "active_config": active_name,
636 "eligible_configs": lbs.iter().map(|lb| serde_json::json!({
637 "name": lb.service.name,
638 "level": lb.service.level.clamp(1, 10),
639 "upstreams": lb.service.upstreams.len(),
640 })).collect::<Vec<_>>(),
641 "eligible_count": lbs.len(),
642 }));
643 return lbs;
644 }
645
646 if let Some(svc) = mgr.active_config().cloned() {
647 log_retry_trace(serde_json::json!({
648 "event": "lbs_for_request",
649 "service": self.service_name,
650 "session_id": session_id,
651 "mode": "multi_level_fallback_active_config",
652 "active_config": active_name,
653 "selected_config": svc.name,
654 "selected_level": svc.level.clamp(1, 10),
655 "selected_upstreams": svc.upstreams.len(),
656 }));
657 return vec![LoadBalancer::new(Arc::new(svc), self.lb_states.clone())];
658 }
659 log_retry_trace(serde_json::json!({
660 "event": "lbs_for_request",
661 "service": self.service_name,
662 "session_id": session_id,
663 "mode": "multi_level_empty",
664 "active_config": active_name,
665 }));
666 Vec::new()
667 }
668
669 fn build_target(
670 &self,
671 upstream: &SelectedUpstream,
672 uri: &Uri,
673 ) -> Result<(reqwest::Url, HeaderMap)> {
674 let base = upstream.upstream.base_url.trim_end_matches('/').to_string();
675
676 let base_url = reqwest::Url::parse(&base)
677 .map_err(|e| anyhow!("invalid upstream base_url {base}: {e}"))?;
678 let base_path = base_url.path().trim_end_matches('/').to_string();
679
680 let mut path = uri.path().to_string();
681 if !base_path.is_empty()
682 && base_path != "/"
683 && (path == base_path || path.starts_with(&format!("{base_path}/")))
684 {
685 let rest = &path[base_path.len()..];
688 path = if rest.is_empty() {
689 "/".to_string()
690 } else {
691 rest.to_string()
692 };
693 if !path.starts_with('/') {
694 path = format!("/{path}");
695 }
696 }
697 let path_and_query = if let Some(q) = uri.query() {
698 format!("{path}?{q}")
699 } else {
700 path
701 };
702
703 let full = format!("{base}{path_and_query}");
704 let url =
705 reqwest::Url::parse(&full).map_err(|e| anyhow!("invalid upstream url {full}: {e}"))?;
706
707 let headers = HeaderMap::new();
709 Ok((url, headers))
710 }
711
712 pub fn state_handle(&self) -> Arc<ProxyState> {
713 self.state.clone()
714 }
715}
716
717fn header_str<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> {
718 headers.get(name).and_then(|v| v.to_str().ok())
719}
720
721fn extract_session_id(headers: &HeaderMap) -> Option<String> {
722 header_str(headers, "session_id")
723 .or_else(|| header_str(headers, "conversation_id"))
724 .map(|s| s.to_string())
725}
726
727fn extract_reasoning_effort_from_request_body(body: &[u8]) -> Option<String> {
728 let v: serde_json::Value = serde_json::from_slice(body).ok()?;
729 v.get("reasoning")
730 .and_then(|r| r.get("effort"))
731 .and_then(|e| e.as_str())
732 .map(|s| s.to_string())
733}
734
735fn extract_model_from_request_body(body: &[u8]) -> Option<String> {
736 let v: serde_json::Value = serde_json::from_slice(body).ok()?;
737 v.get("model")
738 .and_then(|m| m.as_str())
739 .map(|s| s.to_string())
740}
741
742fn apply_reasoning_effort_override(body: &[u8], effort: &str) -> Option<Vec<u8>> {
743 let mut v: serde_json::Value = serde_json::from_slice(body).ok()?;
744 let reasoning = v.get_mut("reasoning").and_then(|r| r.as_object_mut());
745 if let Some(obj) = reasoning {
746 obj.insert(
747 "effort".to_string(),
748 serde_json::Value::String(effort.to_string()),
749 );
750 } else {
751 let mut obj = serde_json::Map::new();
752 obj.insert(
753 "effort".to_string(),
754 serde_json::Value::String(effort.to_string()),
755 );
756 v.as_object_mut()?
757 .insert("reasoning".to_string(), serde_json::Value::Object(obj));
758 }
759 serde_json::to_vec(&v).ok()
760}
761
762fn apply_model_override(body: &[u8], model: &str) -> Option<Vec<u8>> {
763 let mut v: serde_json::Value = serde_json::from_slice(body).ok()?;
764 v.as_object_mut()?.insert(
765 "model".to_string(),
766 serde_json::Value::String(model.to_string()),
767 );
768 serde_json::to_vec(&v).ok()
769}
770
771#[instrument(skip_all, fields(service = %proxy.service_name))]
772pub async fn handle_proxy(
773 proxy: ProxyService,
774 req: Request<Body>,
775) -> Result<Response<Body>, (StatusCode, String)> {
776 let start = Instant::now();
777 let started_at_ms = std::time::SystemTime::now()
778 .duration_since(std::time::UNIX_EPOCH)
779 .map(|d| d.as_millis() as u64)
780 .unwrap_or(0);
781
782 let (parts, body) = req.into_parts();
783 let uri = parts.uri;
784 let method = parts.method;
785 let client_headers = parts.headers;
786 let client_headers_entries_cache: OnceLock<Vec<HeaderEntry>> = OnceLock::new();
787
788 let session_id = extract_session_id(&client_headers);
789
790 proxy.config.maybe_reload_from_disk().await;
791 let cfg_snapshot = proxy.config.snapshot().await;
792 let lbs = proxy
793 .lbs_for_request(cfg_snapshot.as_ref(), session_id.as_deref())
794 .await;
795 if lbs.is_empty() {
796 let dur = start.elapsed().as_millis() as u64;
797 let status = StatusCode::BAD_GATEWAY;
798 let client_headers_entries = client_headers_entries_cache
799 .get_or_init(|| header_map_to_entries(&client_headers))
800 .clone();
801 let http_debug = if should_include_http_warn(status.as_u16()) {
802 Some(HttpDebugLog {
803 request_body_len: None,
804 upstream_request_body_len: None,
805 upstream_headers_ms: None,
806 upstream_first_chunk_ms: None,
807 upstream_body_read_ms: None,
808 upstream_error_class: Some("no_active_upstream_config".to_string()),
809 upstream_error_hint: Some(
810 "未找到任何可用的上游配置(active_config 为空或 upstreams 为空)。".to_string(),
811 ),
812 upstream_cf_ray: None,
813 client_uri: uri.to_string(),
814 target_url: "-".to_string(),
815 client_headers: client_headers_entries,
816 upstream_request_headers: Vec::new(),
817 auth_resolution: None,
818 client_body: None,
819 upstream_request_body: None,
820 upstream_response_headers: None,
821 upstream_response_body: None,
822 upstream_error: Some("no active upstream config".to_string()),
823 })
824 } else {
825 None
826 };
827 log_request_with_debug(
828 proxy.service_name,
829 method.as_str(),
830 uri.path(),
831 status.as_u16(),
832 dur,
833 None,
834 "-",
835 None,
836 "-",
837 session_id.clone(),
838 None,
839 None,
840 None,
841 None,
842 http_debug,
843 );
844 return Err((status, "no active upstream config".to_string()));
845 }
846 let client_content_type = client_headers
847 .get("content-type")
848 .and_then(|v| v.to_str().ok());
849
850 let is_stream = client_headers
852 .get("accept")
853 .and_then(|v| v.to_str().ok())
854 .map(|s| s.contains("text/event-stream"))
855 .unwrap_or(false);
856
857 let path = uri.path();
858 let is_responses_path = path.ends_with("/responses");
859 let is_user_turn = method == Method::POST && is_responses_path;
860 let is_codex_service = proxy.service_name == "codex";
861
862 let cwd = if let Some(id) = session_id.as_deref() {
863 proxy.state.resolve_session_cwd(id).await
864 } else {
865 None
866 };
867 if let Some(id) = session_id.as_deref() {
868 proxy.state.touch_session_override(id, started_at_ms).await;
869 proxy
870 .state
871 .touch_session_config_override(id, started_at_ms)
872 .await;
873 }
874
875 let raw_body = match to_bytes(body, 10 * 1024 * 1024).await {
877 Ok(b) => b,
878 Err(e) => {
879 let dur = start.elapsed().as_millis() as u64;
880 let status = StatusCode::BAD_REQUEST;
881 let err_str = e.to_string();
882 let client_headers_entries = client_headers_entries_cache
883 .get_or_init(|| header_map_to_entries(&client_headers))
884 .clone();
885 let http_debug = if should_include_http_warn(status.as_u16()) {
886 Some(HttpDebugLog {
887 request_body_len: None,
888 upstream_request_body_len: None,
889 upstream_headers_ms: None,
890 upstream_first_chunk_ms: None,
891 upstream_body_read_ms: None,
892 upstream_error_class: Some("client_body_read_error".to_string()),
893 upstream_error_hint: Some(
894 "读取客户端请求 body 失败(可能超过大小限制或连接中断)。".to_string(),
895 ),
896 upstream_cf_ray: None,
897 client_uri: uri.to_string(),
898 target_url: "-".to_string(),
899 client_headers: client_headers_entries,
900 upstream_request_headers: Vec::new(),
901 auth_resolution: None,
902 client_body: None,
903 upstream_request_body: None,
904 upstream_response_headers: None,
905 upstream_response_body: None,
906 upstream_error: Some(err_str.clone()),
907 })
908 } else {
909 None
910 };
911 log_request_with_debug(
912 proxy.service_name,
913 method.as_str(),
914 uri.path(),
915 status.as_u16(),
916 dur,
917 None,
918 "-",
919 None,
920 "-",
921 session_id.clone(),
922 cwd.clone(),
923 None,
924 None,
925 None,
926 http_debug,
927 );
928 return Err((status, err_str));
929 }
930 };
931 let original_effort = extract_reasoning_effort_from_request_body(&raw_body);
932 let override_effort = if let Some(id) = session_id.as_deref() {
933 proxy.state.get_session_effort_override(id).await
934 } else {
935 None
936 };
937 let effective_effort = override_effort.clone().or(original_effort.clone());
938
939 let body_for_upstream = if let Some(ref effort) = override_effort {
940 Bytes::from(
941 apply_reasoning_effort_override(&raw_body, effort)
942 .unwrap_or_else(|| raw_body.as_ref().to_vec()),
943 )
944 } else {
945 raw_body.clone()
946 };
947 let request_model = extract_model_from_request_body(body_for_upstream.as_ref());
948 let request_body_len = raw_body.len();
949
950 let debug_opt = http_debug_options();
951 let warn_opt = http_warn_options();
952 let debug_max = if debug_opt.enabled {
953 debug_opt.max_body_bytes
954 } else {
955 0
956 };
957 let warn_max = if warn_opt.enabled {
958 warn_opt.max_body_bytes
959 } else {
960 0
961 };
962 let request_body_previews = should_log_request_body_preview();
963 let client_body_debug = if request_body_previews && debug_max > 0 {
964 Some(make_body_preview(&raw_body, client_content_type, debug_max))
965 } else {
966 None
967 };
968 let client_body_warn = if request_body_previews && warn_max > 0 {
969 Some(make_body_preview(&raw_body, client_content_type, warn_max))
970 } else {
971 None
972 };
973
974 let request_id = proxy
975 .state
976 .begin_request(
977 proxy.service_name,
978 method.as_str(),
979 uri.path(),
980 session_id.clone(),
981 cwd.clone(),
982 request_model.clone(),
983 effective_effort.clone(),
984 started_at_ms,
985 )
986 .await;
987
988 let retry_cfg = cfg_snapshot.retry.resolve();
989 let plan = retry_plan(&retry_cfg);
990 let upstream_opt = &plan.upstream;
991 let provider_opt = &plan.provider;
992 let cooldown_backoff = crate::lb::CooldownBackoff {
993 factor: plan.cooldown_backoff_factor,
994 max_secs: plan.cooldown_backoff_max_secs,
995 };
996 log_retry_trace(serde_json::json!({
997 "event": "retry_options",
998 "service": proxy.service_name,
999 "request_id": request_id,
1000 "upstream": {
1001 "max_attempts": upstream_opt.max_attempts,
1002 "base_backoff_ms": upstream_opt.base_backoff_ms,
1003 "max_backoff_ms": upstream_opt.max_backoff_ms,
1004 "jitter_ms": upstream_opt.jitter_ms,
1005 "retry_status_ranges": upstream_opt.retry_status_ranges,
1006 "retry_error_classes": upstream_opt.retry_error_classes,
1007 "strategy": if upstream_opt.strategy == RetryStrategy::Failover { "failover" } else { "same_upstream" },
1008 },
1009 "provider": {
1010 "max_attempts": provider_opt.max_attempts,
1011 "base_backoff_ms": provider_opt.base_backoff_ms,
1012 "max_backoff_ms": provider_opt.max_backoff_ms,
1013 "jitter_ms": provider_opt.jitter_ms,
1014 "retry_status_ranges": provider_opt.retry_status_ranges,
1015 "retry_error_classes": provider_opt.retry_error_classes,
1016 "strategy": if provider_opt.strategy == RetryStrategy::Failover { "failover" } else { "same_upstream" },
1017 },
1018 "never_status_ranges": plan.never_status_ranges,
1019 "never_error_classes": plan.never_error_classes,
1020 "cloudflare_challenge_cooldown_secs": plan.cloudflare_challenge_cooldown_secs,
1021 "cloudflare_timeout_cooldown_secs": plan.cloudflare_timeout_cooldown_secs,
1022 "transport_cooldown_secs": plan.transport_cooldown_secs,
1023 "cooldown_backoff_factor": plan.cooldown_backoff_factor,
1024 "cooldown_backoff_max_secs": plan.cooldown_backoff_max_secs,
1025 }));
1026 let total_upstreams = lbs
1027 .iter()
1028 .map(|lb| lb.service.upstreams.len())
1029 .sum::<usize>();
1030 let mut avoid: HashMap<String, HashSet<usize>> = HashMap::new();
1031 let mut upstream_chain: Vec<String> = Vec::new();
1032 let mut avoided_total: usize = 0;
1033
1034 let mut tried_configs: HashSet<String> = HashSet::new();
1042 let strict_multi_config = lbs.len() > 1;
1043 let mut global_attempt: u32 = 0;
1044 let mut last_err: Option<(StatusCode, String)> = None;
1045
1046 for provider_attempt in 0..provider_opt.max_attempts {
1047 let mut provider_lb: Option<LoadBalancer> = None;
1049 for lb in &lbs {
1050 if tried_configs.contains(&lb.service.name) {
1051 continue;
1052 }
1053 provider_lb = Some(lb.clone());
1054 break;
1055 }
1056 let Some(lb) = provider_lb else {
1057 break;
1058 };
1059 let config_name = lb.service.name.clone();
1060
1061 'upstreams: loop {
1063 let avoid_set = avoid.entry(config_name.clone()).or_default();
1064 let upstream_total = lb.service.upstreams.len();
1065 if upstream_total > 0 && avoid_set.len() >= upstream_total {
1066 break 'upstreams;
1067 }
1068
1069 let selected = loop {
1071 let upstream_total = lb.service.upstreams.len();
1072 if upstream_total > 0 && avoid_set.len() >= upstream_total {
1073 break None;
1074 }
1075 let next = {
1076 let avoid_ref: &HashSet<usize> = &*avoid_set;
1077 if strict_multi_config {
1078 lb.select_upstream_avoiding_strict(avoid_ref)
1079 } else {
1080 lb.select_upstream_avoiding(avoid_ref)
1081 }
1082 };
1083 let Some(selected) = next else {
1084 break None;
1085 };
1086
1087 if let Some(ref requested_model) = request_model {
1088 let supported = model_routing::is_model_supported(
1089 &selected.upstream.supported_models,
1090 &selected.upstream.model_mapping,
1091 requested_model,
1092 );
1093 if !supported {
1094 upstream_chain.push(format!(
1095 "{}:{} (idx={}) skipped_unsupported_model={}",
1096 selected.config_name,
1097 selected.upstream.base_url,
1098 selected.index,
1099 requested_model
1100 ));
1101 if avoid_set.insert(selected.index) {
1102 avoided_total = avoided_total.saturating_add(1);
1103 }
1104 continue;
1105 }
1106 }
1107
1108 break Some(selected);
1109 };
1110
1111 let Some(selected) = selected else {
1112 break 'upstreams;
1113 };
1114
1115 let mut model_note = "-".to_string();
1116 let mut body_for_selected = body_for_upstream.clone();
1117 if let Some(ref requested_model) = request_model {
1118 let effective_model = model_routing::effective_model(
1119 &selected.upstream.model_mapping,
1120 requested_model,
1121 );
1122 if effective_model != *requested_model {
1123 if let Some(modified) =
1124 apply_model_override(body_for_upstream.as_ref(), effective_model.as_str())
1125 {
1126 body_for_selected = Bytes::from(modified);
1127 }
1128 model_note = format!("{requested_model}->{effective_model}");
1129 } else {
1130 model_note = requested_model.clone();
1131 }
1132 }
1133
1134 let filtered_body = proxy.filter.apply_bytes(body_for_selected);
1135 let upstream_request_body_len = filtered_body.len();
1136 let upstream_request_body_debug = if request_body_previews && debug_max > 0 {
1137 Some(make_body_preview(
1138 &filtered_body,
1139 client_content_type,
1140 debug_max,
1141 ))
1142 } else {
1143 None
1144 };
1145 let upstream_request_body_warn = if request_body_previews && warn_max > 0 {
1146 Some(make_body_preview(
1147 &filtered_body,
1148 client_content_type,
1149 warn_max,
1150 ))
1151 } else {
1152 None
1153 };
1154
1155 for upstream_attempt in 0..upstream_opt.max_attempts {
1157 global_attempt = global_attempt.saturating_add(1);
1158 let provider_id = selected.upstream.tags.get("provider_id").cloned();
1159 let mut avoid_for_config = avoid_set.iter().copied().collect::<Vec<_>>();
1160 avoid_for_config.sort_unstable();
1161
1162 log_retry_trace(serde_json::json!({
1163 "event": "attempt_select",
1164 "service": proxy.service_name,
1165 "request_id": request_id,
1166 "attempt": global_attempt,
1167 "provider_attempt": provider_attempt + 1,
1168 "upstream_attempt": upstream_attempt + 1,
1169 "provider_max_attempts": provider_opt.max_attempts,
1170 "upstream_max_attempts": upstream_opt.max_attempts,
1171 "config_name": selected.config_name.as_str(),
1172 "upstream_index": selected.index,
1173 "upstream_base_url": selected.upstream.base_url.as_str(),
1174 "provider_id": provider_id.as_deref(),
1175 "avoid_for_config": avoid_for_config,
1176 "avoided_total": avoided_total,
1177 "total_upstreams": total_upstreams,
1178 "model": model_note.as_str(),
1179 }));
1180
1181 let target_url = match proxy.build_target(&selected, &uri) {
1182 Ok((url, _headers)) => url,
1183 Err(e) => {
1184 let err_str = e.to_string();
1185 upstream_chain.push(format!(
1186 "{}:{} (idx={}) target_build_error={} model={}",
1187 selected.config_name,
1188 selected.upstream.base_url,
1189 selected.index,
1190 err_str,
1191 model_note.as_str()
1192 ));
1193 if avoid_set.insert(selected.index) {
1194 avoided_total = avoided_total.saturating_add(1);
1195 }
1196 last_err = Some((StatusCode::BAD_GATEWAY, err_str));
1197 break;
1198 }
1199 };
1200
1201 let mut headers = filter_request_headers(&client_headers);
1206 let client_has_auth = headers.contains_key("authorization");
1207 let (token, _token_src) = resolve_auth_token_with_source(
1208 proxy.service_name,
1209 &selected.upstream.auth,
1210 client_has_auth,
1211 );
1212 if let Some(token) = token
1213 && let Ok(v) = HeaderValue::from_str(&format!("Bearer {token}"))
1214 {
1215 headers.insert(HeaderName::from_static("authorization"), v);
1216 }
1217
1218 let client_has_x_api_key = headers.contains_key("x-api-key");
1219 let (api_key, _api_key_src) = resolve_api_key_with_source(
1220 proxy.service_name,
1221 &selected.upstream.auth,
1222 client_has_x_api_key,
1223 );
1224 if let Some(key) = api_key
1225 && let Ok(v) = HeaderValue::from_str(&key)
1226 {
1227 headers.insert(HeaderName::from_static("x-api-key"), v);
1228 }
1229
1230 let upstream_request_headers = headers.clone();
1231 proxy
1232 .state
1233 .update_request_route(
1234 request_id,
1235 selected.config_name.clone(),
1236 provider_id.clone(),
1237 selected.upstream.base_url.clone(),
1238 )
1239 .await;
1240
1241 let debug_base = if debug_max > 0 || warn_max > 0 {
1242 Some(HttpDebugBase {
1243 debug_max_body_bytes: debug_max,
1244 warn_max_body_bytes: warn_max,
1245 request_body_len,
1246 upstream_request_body_len,
1247 client_uri: uri.to_string(),
1248 target_url: target_url.to_string(),
1249 client_headers: client_headers_entries_cache
1250 .get_or_init(|| header_map_to_entries(&client_headers))
1251 .clone(),
1252 upstream_request_headers: header_map_to_entries(&upstream_request_headers),
1253 auth_resolution: None,
1254 client_body_debug: client_body_debug.clone(),
1255 upstream_request_body_debug: upstream_request_body_debug.clone(),
1256 client_body_warn: client_body_warn.clone(),
1257 upstream_request_body_warn: upstream_request_body_warn.clone(),
1258 })
1259 } else {
1260 None
1261 };
1262
1263 let builder = proxy
1264 .client
1265 .request(method.clone(), target_url.clone())
1266 .headers(headers)
1267 .body(filtered_body.clone());
1268
1269 let upstream_start = Instant::now();
1270 let resp = match builder.send().await {
1271 Ok(r) => r,
1272 Err(e) => {
1273 let err_str = format_reqwest_error_for_retry_chain(&e);
1274 upstream_chain.push(format!(
1275 "{}:{} (idx={}) transport_error={} model={}",
1276 selected.config_name,
1277 selected.upstream.base_url,
1278 selected.index,
1279 err_str,
1280 model_note.as_str()
1281 ));
1282 let can_retry_upstream = upstream_attempt + 1 < upstream_opt.max_attempts
1284 && should_retry_class(upstream_opt, Some("upstream_transport_error"));
1285 if can_retry_upstream {
1286 backoff_sleep(upstream_opt, upstream_attempt).await;
1287 continue;
1288 }
1289
1290 lb.penalize_with_backoff(
1291 selected.index,
1292 plan.transport_cooldown_secs,
1293 "upstream_transport_error",
1294 cooldown_backoff,
1295 );
1296 if avoid_set.insert(selected.index) {
1297 avoided_total = avoided_total.saturating_add(1);
1298 }
1299 last_err = Some((StatusCode::BAD_GATEWAY, err_str));
1300 break;
1301 }
1302 };
1303
1304 let upstream_headers_ms = upstream_start.elapsed().as_millis() as u64;
1305 let status = resp.status();
1306 let success = status.is_success();
1307 let resp_headers = resp.headers().clone();
1308 let resp_headers_filtered = filter_response_headers(&resp_headers);
1309
1310 if is_stream && success {
1311 lb.record_result_with_backoff(
1312 selected.index,
1313 true,
1314 crate::lb::COOLDOWN_SECS,
1315 cooldown_backoff,
1316 );
1317 let retry = retry_info_for_chain(&upstream_chain);
1318 return Ok(build_sse_success_response(
1319 &proxy,
1320 lb.clone(),
1321 selected.clone(),
1322 resp,
1323 SseSuccessMeta {
1324 status,
1325 resp_headers,
1326 resp_headers_filtered,
1327 start,
1328 started_at_ms,
1329 upstream_start,
1330 upstream_headers_ms,
1331 request_body_len,
1332 upstream_request_body_len,
1333 debug_base,
1334 retry,
1335 session_id: session_id.clone(),
1336 cwd: cwd.clone(),
1337 effective_effort: effective_effort.clone(),
1338 request_id,
1339 is_user_turn,
1340 is_codex_service,
1341 transport_cooldown_secs: plan.transport_cooldown_secs,
1342 cooldown_backoff,
1343 method: method.clone(),
1344 path: uri.path().to_string(),
1345 },
1346 )
1347 .await);
1348 }
1349
1350 let bytes = match resp.bytes().await {
1351 Ok(b) => b,
1352 Err(e) => {
1353 let err_str = format_reqwest_error_for_retry_chain(&e);
1354 upstream_chain.push(format!(
1355 "{}:{} (idx={}) body_read_error={} model={}",
1356 selected.config_name,
1357 selected.upstream.base_url,
1358 selected.index,
1359 err_str,
1360 model_note.as_str()
1361 ));
1362 let can_retry_upstream = upstream_attempt + 1 < upstream_opt.max_attempts
1363 && should_retry_class(upstream_opt, Some("upstream_transport_error"));
1364 if can_retry_upstream {
1365 backoff_sleep(upstream_opt, upstream_attempt).await;
1366 continue;
1367 }
1368 lb.penalize_with_backoff(
1369 selected.index,
1370 plan.transport_cooldown_secs,
1371 "upstream_body_read_error",
1372 cooldown_backoff,
1373 );
1374 if avoid_set.insert(selected.index) {
1375 avoided_total = avoided_total.saturating_add(1);
1376 }
1377 last_err = Some((StatusCode::BAD_GATEWAY, err_str));
1378 break;
1379 }
1380 };
1381
1382 let _upstream_body_read_ms = upstream_start.elapsed().as_millis() as u64;
1383 let dur = start.elapsed().as_millis() as u64;
1384
1385 let status_code = status.as_u16();
1386 let (cls, _hint, _cf_ray) =
1387 classify_upstream_response(status_code, &resp_headers, bytes.as_ref());
1388 let never_retry = should_never_retry(&plan, status_code, cls.as_deref());
1389
1390 upstream_chain.push(format!(
1391 "{} (idx={}) status={} class={} model={}",
1392 selected.upstream.base_url,
1393 selected.index,
1394 status_code,
1395 cls.as_deref().unwrap_or("-"),
1396 model_note.as_str()
1397 ));
1398
1399 if success {
1400 lb.record_result_with_backoff(
1401 selected.index,
1402 true,
1403 crate::lb::COOLDOWN_SECS,
1404 cooldown_backoff,
1405 );
1406
1407 let usage = extract_usage_from_bytes(&bytes);
1408 let usage_for_log = usage.clone();
1409 let retry = retry_info_for_chain(&upstream_chain);
1410 let retry_for_log = retry.clone();
1411 proxy
1412 .state
1413 .finish_request(crate::state::FinishRequestParams {
1414 id: request_id,
1415 status_code,
1416 duration_ms: dur,
1417 ended_at_ms: started_at_ms + dur,
1418 usage,
1419 retry,
1420 ttfb_ms: Some(upstream_headers_ms),
1421 })
1422 .await;
1423
1424 log_request_with_debug(
1425 proxy.service_name,
1426 method.as_str(),
1427 uri.path(),
1428 status_code,
1429 dur,
1430 Some(upstream_headers_ms),
1431 &selected.config_name,
1432 provider_id.clone(),
1433 &selected.upstream.base_url,
1434 session_id.clone(),
1435 cwd.clone(),
1436 effective_effort.clone(),
1437 usage_for_log,
1438 retry_for_log,
1439 None,
1440 );
1441
1442 let mut builder = Response::builder().status(status);
1443 for (name, value) in resp_headers_filtered.iter() {
1444 builder = builder.header(name, value);
1445 }
1446 return Ok(builder.body(Body::from(bytes)).unwrap());
1447 }
1448
1449 if never_retry {
1450 lb.record_result_with_backoff(
1451 selected.index,
1452 false,
1453 crate::lb::COOLDOWN_SECS,
1454 cooldown_backoff,
1455 );
1456
1457 let retry = retry_info_for_chain(&upstream_chain);
1458 let retry_for_log = retry.clone();
1459 proxy
1460 .state
1461 .finish_request(crate::state::FinishRequestParams {
1462 id: request_id,
1463 status_code,
1464 duration_ms: dur,
1465 ended_at_ms: started_at_ms + dur,
1466 usage: None,
1467 retry,
1468 ttfb_ms: Some(upstream_headers_ms),
1469 })
1470 .await;
1471
1472 log_request_with_debug(
1473 proxy.service_name,
1474 method.as_str(),
1475 uri.path(),
1476 status_code,
1477 dur,
1478 Some(upstream_headers_ms),
1479 &selected.config_name,
1480 provider_id.clone(),
1481 &selected.upstream.base_url,
1482 session_id.clone(),
1483 cwd.clone(),
1484 effective_effort.clone(),
1485 None,
1486 retry_for_log,
1487 None,
1488 );
1489
1490 let mut builder = Response::builder().status(status);
1491 for (name, value) in resp_headers_filtered.iter() {
1492 builder = builder.header(name, value);
1493 }
1494 return Ok(builder.body(Body::from(bytes)).unwrap());
1495 }
1496
1497 let upstream_retryable = should_retry_status(upstream_opt, status_code)
1498 || should_retry_class(upstream_opt, cls.as_deref());
1499 let can_retry_upstream =
1500 upstream_retryable && upstream_attempt + 1 < upstream_opt.max_attempts;
1501 if can_retry_upstream {
1502 retry_sleep(upstream_opt, upstream_attempt, &resp_headers).await;
1503 continue;
1504 }
1505
1506 let provider_retryable = should_retry_status(provider_opt, status_code)
1508 || should_retry_class(provider_opt, cls.as_deref());
1509 if provider_retryable {
1510 lb.penalize_with_backoff(
1511 selected.index,
1512 plan.transport_cooldown_secs,
1513 &format!("status_{}", status_code),
1514 cooldown_backoff,
1515 );
1516 last_err = Some((status, String::from_utf8_lossy(bytes.as_ref()).to_string()));
1517
1518 if avoid_set.insert(selected.index) {
1519 avoided_total = avoided_total.saturating_add(1);
1520 }
1521 break;
1522 }
1523
1524 let retry = retry_info_for_chain(&upstream_chain);
1526 let retry_for_log = retry.clone();
1527 proxy
1528 .state
1529 .finish_request(crate::state::FinishRequestParams {
1530 id: request_id,
1531 status_code,
1532 duration_ms: dur,
1533 ended_at_ms: started_at_ms + dur,
1534 usage: None,
1535 retry,
1536 ttfb_ms: Some(upstream_headers_ms),
1537 })
1538 .await;
1539
1540 log_request_with_debug(
1541 proxy.service_name,
1542 method.as_str(),
1543 uri.path(),
1544 status_code,
1545 dur,
1546 Some(upstream_headers_ms),
1547 &selected.config_name,
1548 provider_id.clone(),
1549 &selected.upstream.base_url,
1550 session_id.clone(),
1551 cwd.clone(),
1552 effective_effort.clone(),
1553 None,
1554 retry_for_log,
1555 None,
1556 );
1557
1558 let mut builder = Response::builder().status(status);
1559 for (name, value) in resp_headers_filtered.iter() {
1560 builder = builder.header(name, value);
1561 }
1562 return Ok(builder.body(Body::from(bytes)).unwrap());
1563 }
1564
1565 let upstream_total = lb.service.upstreams.len();
1568 if upstream_total > 0 && avoid_set.len() >= upstream_total {
1569 break 'upstreams;
1570 }
1571 continue 'upstreams;
1572 }
1573
1574 tried_configs.insert(config_name);
1575
1576 if provider_opt.base_backoff_ms > 0 {
1578 backoff_sleep(provider_opt, provider_attempt).await;
1579 }
1580 }
1581
1582 if let Some((status, msg)) = last_err {
1584 let dur = start.elapsed().as_millis() as u64;
1585 log_request_with_debug(
1586 proxy.service_name,
1587 method.as_str(),
1588 uri.path(),
1589 status.as_u16(),
1590 dur,
1591 None,
1592 "-",
1593 None,
1594 "-",
1595 session_id.clone(),
1596 cwd.clone(),
1597 effective_effort.clone(),
1598 None,
1599 retry_info_for_chain(&upstream_chain),
1600 None,
1601 );
1602 let retry = retry_info_for_chain(&upstream_chain);
1603 proxy
1604 .state
1605 .finish_request(crate::state::FinishRequestParams {
1606 id: request_id,
1607 status_code: status.as_u16(),
1608 duration_ms: dur,
1609 ended_at_ms: started_at_ms + dur,
1610 usage: None,
1611 retry,
1612 ttfb_ms: None,
1613 })
1614 .await;
1615 return Err((status, msg));
1616 }
1617
1618 return Err((
1619 StatusCode::BAD_GATEWAY,
1620 "no upstreams available".to_string(),
1621 ));
1622
1623 #[cfg(any())]
1624 {
1625 for attempt_index in 0..retry_opt.max_attempts {
1626 let avoided_total = avoid.values().map(|s| s.len()).sum::<usize>();
1627 if total_upstreams > 0 && avoided_total >= total_upstreams {
1628 upstream_chain.push(format!("all_upstreams_avoided total={total_upstreams}"));
1629 break;
1630 }
1631
1632 let strict_multi_config = lbs.len() > 1;
1633
1634 let mut chosen: Option<(LoadBalancer, SelectedUpstream)> = None;
1635 for lb in &lbs {
1636 let cfg_name = lb.service.name.clone();
1637 let avoid_set = avoid.entry(cfg_name.clone()).or_default();
1638 loop {
1639 let upstream_total = lb.service.upstreams.len();
1640 if upstream_total > 0 && avoid_set.len() >= upstream_total {
1641 break;
1642 }
1643 let next = {
1644 let avoid_ref: &HashSet<usize> = &*avoid_set;
1645 if strict_multi_config {
1646 lb.select_upstream_avoiding_strict(avoid_ref)
1647 } else {
1648 lb.select_upstream_avoiding(avoid_ref)
1649 }
1650 };
1651 let Some(selected) = next else {
1652 break;
1653 };
1654
1655 if let Some(ref requested_model) = request_model {
1656 let supported = model_routing::is_model_supported(
1657 &selected.upstream.supported_models,
1658 &selected.upstream.model_mapping,
1659 requested_model,
1660 );
1661 if !supported {
1662 upstream_chain.push(format!(
1663 "{}:{} (idx={}) skipped_unsupported_model={}",
1664 selected.config_name,
1665 selected.upstream.base_url,
1666 selected.index,
1667 requested_model
1668 ));
1669 avoid_set.insert(selected.index);
1670 continue;
1671 }
1672 }
1673
1674 chosen = Some((lb.clone(), selected));
1675 break;
1676 }
1677 if chosen.is_some() {
1678 break;
1679 }
1680 }
1681
1682 if chosen.is_none() && strict_multi_config {
1686 for lb in &lbs {
1687 let cfg_name = lb.service.name.clone();
1688 let avoid_set = avoid.entry(cfg_name.clone()).or_default();
1689 let upstream_total = lb.service.upstreams.len();
1690 if upstream_total > 0 && avoid_set.len() >= upstream_total {
1691 continue;
1692 }
1693 let avoid_ref: &HashSet<usize> = &*avoid_set;
1694 if let Some(selected) = lb.select_upstream_avoiding(avoid_ref) {
1695 chosen = Some((lb.clone(), selected));
1696 break;
1697 }
1698 }
1699 }
1700
1701 let Some((lb, selected)) = chosen else {
1702 log_retry_trace(serde_json::json!({
1703 "event": "attempt_no_upstream",
1704 "service": proxy.service_name,
1705 "request_id": request_id,
1706 "attempt": attempt_index + 1,
1707 "max_attempts": retry_opt.max_attempts,
1708 "avoid": avoid.iter().map(|(k, v)| serde_json::json!({
1709 "config": k,
1710 "indices": v.iter().copied().collect::<Vec<_>>(),
1711 })).collect::<Vec<_>>(),
1712 "total_upstreams": total_upstreams,
1713 "model": request_model.clone(),
1714 }));
1715 let dur = start.elapsed().as_millis() as u64;
1716 let status = if request_model.is_some() {
1717 StatusCode::NOT_FOUND
1718 } else {
1719 StatusCode::BAD_GATEWAY
1720 };
1721 log_request_with_debug(
1722 proxy.service_name,
1723 method.as_str(),
1724 uri.path(),
1725 status.as_u16(),
1726 dur,
1727 None,
1728 "-",
1729 None,
1730 "-",
1731 session_id.clone(),
1732 cwd.clone(),
1733 effective_effort.clone(),
1734 None,
1735 retry_info_for_chain(&upstream_chain),
1736 None,
1737 );
1738 let retry = retry_info_for_chain(&upstream_chain);
1739 proxy
1740 .state
1741 .finish_request(crate::state::FinishRequestParams {
1742 id: request_id,
1743 status_code: status.as_u16(),
1744 duration_ms: dur,
1745 ended_at_ms: started_at_ms + dur,
1746 usage: None,
1747 retry,
1748 ttfb_ms: None,
1749 })
1750 .await;
1751 if let Some(model) = request_model.as_deref() {
1752 return Err((
1753 status,
1754 format!("no upstreams support requested model '{model}'"),
1755 ));
1756 }
1757 return Err((status, "no upstreams available".to_string()));
1758 };
1759
1760 let selected_config_name = selected.config_name.clone();
1761 let selected_upstream_index = selected.index;
1762 let selected_upstream_base_url = selected.upstream.base_url.clone();
1763 let provider_id = selected.upstream.tags.get("provider_id").cloned();
1764 log_retry_trace(serde_json::json!({
1765 "event": "attempt_select",
1766 "service": proxy.service_name,
1767 "request_id": request_id,
1768 "attempt": attempt_index + 1,
1769 "max_attempts": retry_opt.max_attempts,
1770 "strategy": if retry_failover { "failover" } else { "same_upstream" },
1771 "config_name": selected_config_name,
1772 "upstream_index": selected_upstream_index,
1773 "upstream_base_url": selected_upstream_base_url,
1774 "provider_id": provider_id.clone(),
1775 "model": request_model.clone(),
1776 "lb_state": lb_state_snapshot_json(&lb),
1777 "avoid_for_config": avoid.get(&selected.config_name).map(|s| s.iter().copied().collect::<Vec<_>>()),
1778 "avoided_total": avoid.values().map(|s| s.len()).sum::<usize>(),
1779 "total_upstreams": total_upstreams,
1780 }));
1781
1782 let mut model_note = "-".to_string();
1783 let mut body_for_selected = body_for_upstream.clone();
1784 if let Some(ref requested_model) = request_model {
1785 let effective_model = model_routing::effective_model(
1786 &selected.upstream.model_mapping,
1787 requested_model,
1788 );
1789 if effective_model != *requested_model {
1790 if let Some(modified) =
1791 apply_model_override(body_for_upstream.as_ref(), effective_model.as_str())
1792 {
1793 body_for_selected = Bytes::from(modified);
1794 }
1795 model_note = format!("{requested_model}->{effective_model}");
1796 } else {
1797 model_note = requested_model.clone();
1798 }
1799 }
1800
1801 let filtered_body = proxy.filter.apply_bytes(body_for_selected);
1802 let upstream_request_body_len = filtered_body.len();
1803 let upstream_request_body_debug = if request_body_previews && debug_max > 0 {
1804 Some(make_body_preview(
1805 &filtered_body,
1806 client_content_type,
1807 debug_max,
1808 ))
1809 } else {
1810 None
1811 };
1812 let upstream_request_body_warn = if request_body_previews && warn_max > 0 {
1813 Some(make_body_preview(
1814 &filtered_body,
1815 client_content_type,
1816 warn_max,
1817 ))
1818 } else {
1819 None
1820 };
1821
1822 let target_url = match proxy.build_target(&selected, &uri) {
1823 Ok((url, _headers)) => url,
1824 Err(e) => {
1825 lb.record_result_with_backoff(
1826 selected.index,
1827 false,
1828 crate::lb::COOLDOWN_SECS,
1829 cooldown_backoff,
1830 );
1831 let err_str = e.to_string();
1832 upstream_chain.push(format!(
1833 "{}:{} (idx={}) target_build_error={} model={}",
1834 selected.config_name,
1835 selected.upstream.base_url,
1836 selected.index,
1837 err_str,
1838 model_note.as_str()
1839 ));
1840 avoid
1841 .entry(selected.config_name.clone())
1842 .or_default()
1843 .insert(selected.index);
1844
1845 let can_retry = attempt_index + 1 < retry_opt.max_attempts;
1846 if can_retry {
1847 backoff_sleep(&retry_opt, attempt_index).await;
1848 continue;
1849 }
1850
1851 let dur = start.elapsed().as_millis() as u64;
1852 let status = StatusCode::BAD_GATEWAY;
1853 let client_headers_entries = client_headers_entries_cache
1854 .get_or_init(|| header_map_to_entries(&client_headers))
1855 .clone();
1856 let http_debug = if should_include_http_warn(status.as_u16()) {
1857 Some(HttpDebugLog {
1858 request_body_len: Some(request_body_len),
1859 upstream_request_body_len: Some(upstream_request_body_len),
1860 upstream_headers_ms: None,
1861 upstream_first_chunk_ms: None,
1862 upstream_body_read_ms: None,
1863 upstream_error_class: Some("target_build_error".to_string()),
1864 upstream_error_hint: Some(
1865 "构造上游 target_url 失败(通常是 base_url 配置错误)。"
1866 .to_string(),
1867 ),
1868 upstream_cf_ray: None,
1869 client_uri: uri.to_string(),
1870 target_url: "-".to_string(),
1871 client_headers: client_headers_entries,
1872 upstream_request_headers: Vec::new(),
1873 auth_resolution: None,
1874 client_body: client_body_warn.clone(),
1875 upstream_request_body: upstream_request_body_warn.clone(),
1876 upstream_response_headers: None,
1877 upstream_response_body: None,
1878 upstream_error: Some(err_str.clone()),
1879 })
1880 } else {
1881 None
1882 };
1883 log_request_with_debug(
1884 proxy.service_name,
1885 method.as_str(),
1886 uri.path(),
1887 status.as_u16(),
1888 dur,
1889 None,
1890 &selected.config_name,
1891 selected.upstream.tags.get("provider_id").cloned(),
1892 &selected.upstream.base_url,
1893 session_id.clone(),
1894 cwd.clone(),
1895 effective_effort.clone(),
1896 None,
1897 retry_info_for_chain(&upstream_chain),
1898 http_debug,
1899 );
1900 let retry = retry_info_for_chain(&upstream_chain);
1901 proxy
1902 .state
1903 .finish_request(crate::state::FinishRequestParams {
1904 id: request_id,
1905 status_code: status.as_u16(),
1906 duration_ms: dur,
1907 ended_at_ms: started_at_ms + dur,
1908 usage: None,
1909 retry,
1910 ttfb_ms: None,
1911 })
1912 .await;
1913 return Err((status, err_str));
1914 }
1915 };
1916
1917 let mut headers = filter_request_headers(&client_headers);
1922 let client_has_auth = headers.contains_key("authorization");
1923 let (token, token_src) = resolve_auth_token_with_source(
1924 proxy.service_name,
1925 &selected.upstream.auth,
1926 client_has_auth,
1927 );
1928 if let Some(token) = token
1929 && let Ok(v) = HeaderValue::from_str(&format!("Bearer {token}"))
1930 {
1931 headers.insert(HeaderName::from_static("authorization"), v);
1932 }
1933
1934 let client_has_x_api_key = headers.contains_key("x-api-key");
1935 let (api_key, api_key_src) = resolve_api_key_with_source(
1936 proxy.service_name,
1937 &selected.upstream.auth,
1938 client_has_x_api_key,
1939 );
1940 if let Some(key) = api_key
1941 && let Ok(v) = HeaderValue::from_str(&key)
1942 {
1943 headers.insert(HeaderName::from_static("x-api-key"), v);
1944 }
1945
1946 let upstream_request_headers = headers.clone();
1947 proxy
1948 .state
1949 .update_request_route(
1950 request_id,
1951 selected.config_name.clone(),
1952 provider_id.clone(),
1953 selected.upstream.base_url.clone(),
1954 )
1955 .await;
1956 let auth_resolution = AuthResolutionLog {
1957 authorization: Some(token_src),
1958 x_api_key: Some(api_key_src),
1959 };
1960
1961 let debug_base = if debug_max > 0 || warn_max > 0 {
1962 Some(HttpDebugBase {
1963 debug_max_body_bytes: debug_max,
1964 warn_max_body_bytes: warn_max,
1965 request_body_len,
1966 upstream_request_body_len,
1967 client_uri: uri.to_string(),
1968 target_url: target_url.to_string(),
1969 client_headers: client_headers_entries_cache
1970 .get_or_init(|| header_map_to_entries(&client_headers))
1971 .clone(),
1972 upstream_request_headers: header_map_to_entries(&upstream_request_headers),
1973 auth_resolution: Some(auth_resolution),
1974 client_body_debug: client_body_debug.clone(),
1975 upstream_request_body_debug: upstream_request_body_debug.clone(),
1976 client_body_warn: client_body_warn.clone(),
1977 upstream_request_body_warn: upstream_request_body_warn.clone(),
1978 })
1979 } else {
1980 None
1981 };
1982
1983 tracing::debug!(
1985 "forwarding {} {} to {} ({})",
1986 method,
1987 uri.path(),
1988 target_url,
1989 selected.config_name
1990 );
1991
1992 let builder = proxy
1993 .client
1994 .request(method.clone(), target_url.clone())
1995 .headers(headers)
1996 .body(filtered_body.clone());
1997
1998 let upstream_start = Instant::now();
1999 let resp = match builder.send().await {
2000 Ok(r) => r,
2001 Err(e) => {
2002 log_retry_trace(serde_json::json!({
2003 "event": "attempt_transport_error",
2004 "service": proxy.service_name,
2005 "request_id": request_id,
2006 "attempt": attempt_index + 1,
2007 "max_attempts": retry_opt.max_attempts,
2008 "strategy": if retry_failover { "failover" } else { "same_upstream" },
2009 "config_name": selected.config_name.as_str(),
2010 "upstream_index": selected.index,
2011 "upstream_base_url": selected.upstream.base_url.as_str(),
2012 "provider_id": provider_id.as_deref(),
2013 "error": format_reqwest_error_for_retry_chain(&e),
2014 }));
2015 if retry_failover {
2016 lb.record_result_with_backoff(
2017 selected.index,
2018 false,
2019 crate::lb::COOLDOWN_SECS,
2020 cooldown_backoff,
2021 );
2022 }
2023 let err_str = format_reqwest_error_for_retry_chain(&e);
2024 upstream_chain.push(format!(
2025 "{}:{} (idx={}) transport_error={} model={}",
2026 selected.config_name,
2027 selected.upstream.base_url,
2028 selected.index,
2029 err_str,
2030 model_note.as_str()
2031 ));
2032 let can_retry = attempt_index + 1 < retry_opt.max_attempts
2033 && should_retry_class(&retry_opt, Some("upstream_transport_error"));
2034 if can_retry {
2035 if retry_failover {
2036 lb.penalize_with_backoff(
2037 selected.index,
2038 retry_opt.transport_cooldown_secs,
2039 "upstream_transport_error",
2040 cooldown_backoff,
2041 );
2042 avoid
2043 .entry(selected.config_name.clone())
2044 .or_default()
2045 .insert(selected.index);
2046 }
2047 backoff_sleep(&retry_opt, attempt_index).await;
2048 continue;
2049 }
2050
2051 if should_retry_class(&retry_opt, Some("upstream_transport_error")) {
2054 lb.penalize_with_backoff(
2055 selected.index,
2056 retry_opt.transport_cooldown_secs,
2057 "upstream_transport_error_final",
2058 cooldown_backoff,
2059 );
2060 }
2061
2062 let dur = start.elapsed().as_millis() as u64;
2063 let status_code = StatusCode::BAD_GATEWAY.as_u16();
2064 let upstream_headers_ms = upstream_start.elapsed().as_millis() as u64;
2065 let retry = retry_info_for_chain(&upstream_chain);
2066 let http_debug_warn = debug_base.as_ref().and_then(|b| {
2067 if b.warn_max_body_bytes == 0 {
2068 return None;
2069 }
2070 Some(HttpDebugLog {
2071 request_body_len: Some(b.request_body_len),
2072 upstream_request_body_len: Some(b.upstream_request_body_len),
2073 upstream_headers_ms: Some(upstream_headers_ms),
2074 upstream_first_chunk_ms: None,
2075 upstream_body_read_ms: None,
2076 upstream_error_class: Some("upstream_transport_error".to_string()),
2077 upstream_error_hint: Some(
2078 "上游连接/发送请求失败(reqwest 错误);请检查网络、DNS、TLS、代理设置或上游可用性。".to_string(),
2079 ),
2080 upstream_cf_ray: None,
2081 client_uri: b.client_uri.clone(),
2082 target_url: b.target_url.clone(),
2083 client_headers: b.client_headers.clone(),
2084 upstream_request_headers: b.upstream_request_headers.clone(),
2085 auth_resolution: b.auth_resolution.clone(),
2086 client_body: b.client_body_warn.clone(),
2087 upstream_request_body: b.upstream_request_body_warn.clone(),
2088 upstream_response_headers: None,
2089 upstream_response_body: None,
2090 upstream_error: Some(err_str.clone()),
2091 })
2092 });
2093 if should_include_http_warn(status_code)
2094 && let Some(h) = http_debug_warn.as_ref()
2095 {
2096 warn_http_debug(status_code, h);
2097 }
2098 let http_debug = if should_include_http_debug(status_code) {
2099 debug_base.as_ref().and_then(|b| {
2100 if b.debug_max_body_bytes == 0 {
2101 return None;
2102 }
2103 Some(HttpDebugLog {
2104 request_body_len: Some(b.request_body_len),
2105 upstream_request_body_len: Some(b.upstream_request_body_len),
2106 upstream_headers_ms: Some(upstream_headers_ms),
2107 upstream_first_chunk_ms: None,
2108 upstream_body_read_ms: None,
2109 upstream_error_class: Some("upstream_transport_error".to_string()),
2110 upstream_error_hint: Some(
2111 "上游连接/发送请求失败(reqwest 错误);请检查网络、DNS、TLS、代理设置或上游可用性。".to_string(),
2112 ),
2113 upstream_cf_ray: None,
2114 client_uri: b.client_uri.clone(),
2115 target_url: b.target_url.clone(),
2116 client_headers: b.client_headers.clone(),
2117 upstream_request_headers: b.upstream_request_headers.clone(),
2118 auth_resolution: b.auth_resolution.clone(),
2119 client_body: b.client_body_debug.clone(),
2120 upstream_request_body: b.upstream_request_body_debug.clone(),
2121 upstream_response_headers: None,
2122 upstream_response_body: None,
2123 upstream_error: Some(err_str.clone()),
2124 })
2125 })
2126 } else if should_include_http_warn(status_code) {
2127 http_debug_warn.clone()
2128 } else {
2129 None
2130 };
2131 log_request_with_debug(
2132 proxy.service_name,
2133 method.as_str(),
2134 uri.path(),
2135 status_code,
2136 dur,
2137 None,
2138 &selected.config_name,
2139 selected.upstream.tags.get("provider_id").cloned(),
2140 &selected.upstream.base_url,
2141 session_id.clone(),
2142 cwd.clone(),
2143 effective_effort.clone(),
2144 None,
2145 retry.clone(),
2146 http_debug,
2147 );
2148 proxy
2149 .state
2150 .finish_request(crate::state::FinishRequestParams {
2151 id: request_id,
2152 status_code,
2153 duration_ms: dur,
2154 ended_at_ms: started_at_ms + dur,
2155 usage: None,
2156 retry,
2157 ttfb_ms: None,
2158 })
2159 .await;
2160 return Err((StatusCode::BAD_GATEWAY, e.to_string()));
2161 }
2162 };
2163
2164 let upstream_headers_ms = upstream_start.elapsed().as_millis() as u64;
2165 let status = resp.status();
2166 let success = status.is_success();
2167 let resp_headers = resp.headers().clone();
2168 let resp_headers_filtered = filter_response_headers(&resp_headers);
2169
2170 if is_stream && success {
2173 lb.record_result_with_backoff(
2174 selected.index,
2175 true,
2176 crate::lb::COOLDOWN_SECS,
2177 cooldown_backoff,
2178 );
2179 upstream_chain.push(format!(
2180 "{} (idx={}) status={} model={}",
2181 selected.upstream.base_url,
2182 selected.index,
2183 status.as_u16(),
2184 model_note.as_str()
2185 ));
2186 let retry = retry_info_for_chain(&upstream_chain);
2187
2188 return Ok(build_sse_success_response(
2189 &proxy,
2190 lb.clone(),
2191 selected,
2192 resp,
2193 SseSuccessMeta {
2194 status,
2195 resp_headers,
2196 resp_headers_filtered,
2197 start,
2198 started_at_ms,
2199 upstream_start,
2200 upstream_headers_ms,
2201 request_body_len,
2202 upstream_request_body_len,
2203 debug_base,
2204 retry,
2205 session_id: session_id.clone(),
2206 cwd: cwd.clone(),
2207 effective_effort: effective_effort.clone(),
2208 request_id,
2209 is_user_turn,
2210 is_codex_service,
2211 transport_cooldown_secs: retry_opt.transport_cooldown_secs,
2212 cooldown_backoff,
2213 method: method.clone(),
2214 path: uri.path().to_string(),
2215 },
2216 )
2217 .await);
2218 } else {
2219 let bytes = match resp.bytes().await {
2220 Ok(b) => b,
2221 Err(e) => {
2222 log_retry_trace(serde_json::json!({
2223 "event": "attempt_body_read_error",
2224 "service": proxy.service_name,
2225 "request_id": request_id,
2226 "attempt": attempt_index + 1,
2227 "max_attempts": retry_opt.max_attempts,
2228 "strategy": if retry_failover { "failover" } else { "same_upstream" },
2229 "config_name": selected.config_name.as_str(),
2230 "upstream_index": selected.index,
2231 "upstream_base_url": selected.upstream.base_url.as_str(),
2232 "provider_id": provider_id.as_deref(),
2233 "error": format_reqwest_error_for_retry_chain(&e),
2234 }));
2235 if retry_failover {
2236 lb.record_result_with_backoff(
2237 selected.index,
2238 false,
2239 crate::lb::COOLDOWN_SECS,
2240 cooldown_backoff,
2241 );
2242 }
2243 let err_str = format_reqwest_error_for_retry_chain(&e);
2244 upstream_chain.push(format!(
2245 "{}:{} (idx={}) body_read_error={} model={}",
2246 selected.config_name,
2247 selected.upstream.base_url,
2248 selected.index,
2249 err_str,
2250 model_note.as_str()
2251 ));
2252 let can_retry = attempt_index + 1 < retry_opt.max_attempts
2253 && should_retry_class(&retry_opt, Some("upstream_transport_error"));
2254 if can_retry {
2255 if retry_failover {
2256 lb.penalize_with_backoff(
2257 selected.index,
2258 retry_opt.transport_cooldown_secs,
2259 "upstream_body_read_error",
2260 cooldown_backoff,
2261 );
2262 avoid
2263 .entry(selected.config_name.clone())
2264 .or_default()
2265 .insert(selected.index);
2266 }
2267 backoff_sleep(&retry_opt, attempt_index).await;
2268 continue;
2269 }
2270
2271 if should_retry_class(&retry_opt, Some("upstream_transport_error")) {
2274 lb.penalize_with_backoff(
2275 selected.index,
2276 retry_opt.transport_cooldown_secs,
2277 "upstream_body_read_error_final",
2278 cooldown_backoff,
2279 );
2280 }
2281
2282 let dur = start.elapsed().as_millis() as u64;
2283 let status = StatusCode::BAD_GATEWAY;
2284 let http_debug = if should_include_http_warn(status.as_u16())
2285 && let Some(b) = debug_base.as_ref()
2286 {
2287 Some(HttpDebugLog {
2288 request_body_len: Some(b.request_body_len),
2289 upstream_request_body_len: Some(b.upstream_request_body_len),
2290 upstream_headers_ms: Some(upstream_headers_ms),
2291 upstream_first_chunk_ms: None,
2292 upstream_body_read_ms: None,
2293 upstream_error_class: Some("upstream_transport_error".to_string()),
2294 upstream_error_hint: Some(
2295 "读取上游响应 body 失败(连接中断/解码错误等);可视为传输错误。"
2296 .to_string(),
2297 ),
2298 upstream_cf_ray: None,
2299 client_uri: b.client_uri.clone(),
2300 target_url: b.target_url.clone(),
2301 client_headers: b.client_headers.clone(),
2302 upstream_request_headers: b.upstream_request_headers.clone(),
2303 auth_resolution: b.auth_resolution.clone(),
2304 client_body: b.client_body_warn.clone(),
2305 upstream_request_body: b.upstream_request_body_warn.clone(),
2306 upstream_response_headers: Some(header_map_to_entries(&resp_headers)),
2307 upstream_response_body: None,
2308 upstream_error: Some(err_str.clone()),
2309 })
2310 } else {
2311 None
2312 };
2313 log_request_with_debug(
2314 proxy.service_name,
2315 method.as_str(),
2316 uri.path(),
2317 status.as_u16(),
2318 dur,
2319 Some(upstream_headers_ms),
2320 &selected.config_name,
2321 selected.upstream.tags.get("provider_id").cloned(),
2322 &selected.upstream.base_url,
2323 session_id.clone(),
2324 cwd.clone(),
2325 effective_effort.clone(),
2326 None,
2327 retry_info_for_chain(&upstream_chain),
2328 http_debug,
2329 );
2330 let retry = retry_info_for_chain(&upstream_chain);
2331 proxy
2332 .state
2333 .finish_request(crate::state::FinishRequestParams {
2334 id: request_id,
2335 status_code: status.as_u16(),
2336 duration_ms: dur,
2337 ended_at_ms: started_at_ms + dur,
2338 usage: None,
2339 retry,
2340 ttfb_ms: Some(upstream_headers_ms),
2341 })
2342 .await;
2343 return Err((status, err_str));
2344 }
2345 };
2346 let upstream_body_read_ms = upstream_start.elapsed().as_millis() as u64;
2347 let dur = start.elapsed().as_millis() as u64;
2348 let usage = extract_usage_from_bytes(&bytes);
2349 let status_code = status.as_u16();
2350 let (cls, hint, cf_ray) =
2351 classify_upstream_response(status_code, &resp_headers, bytes.as_ref());
2352 let never_retry = should_never_retry_status(&retry_opt, status_code)
2353 || should_never_retry_class(&retry_opt, cls.as_deref());
2354
2355 upstream_chain.push(format!(
2356 "{} (idx={}) status={} class={} model={}",
2357 selected.upstream.base_url,
2358 selected.index,
2359 status_code,
2360 cls.as_deref().unwrap_or("-"),
2361 model_note.as_str()
2362 ));
2363
2364 if !success
2368 && attempt_index + 1 >= retry_opt.max_attempts
2369 && !never_retry
2370 && (should_retry_status(&retry_opt, status_code)
2371 || should_retry_class(&retry_opt, cls.as_deref()))
2372 {
2373 log_retry_trace(serde_json::json!({
2374 "event": "attempt_final_retryable_failure",
2375 "service": proxy.service_name,
2376 "request_id": request_id,
2377 "attempt": attempt_index + 1,
2378 "max_attempts": retry_opt.max_attempts,
2379 "status_code": status_code,
2380 "class": cls.as_deref(),
2381 "hint": hint.as_deref(),
2382 "cf_ray": cf_ray.as_deref(),
2383 "config_name": selected.config_name.as_str(),
2384 "upstream_index": selected.index,
2385 "upstream_base_url": selected.upstream.base_url.as_str(),
2386 "provider_id": provider_id.as_deref(),
2387 "should_retry_status": should_retry_status(&retry_opt, status_code),
2388 "should_retry_class": should_retry_class(&retry_opt, cls.as_deref()),
2389 "never_retry_status": should_never_retry_status(&retry_opt, status_code),
2390 "never_retry_class": should_never_retry_class(&retry_opt, cls.as_deref()),
2391 }));
2392 match cls.as_deref() {
2393 Some("cloudflare_challenge") => lb.penalize_with_backoff(
2394 selected.index,
2395 retry_opt.cloudflare_challenge_cooldown_secs,
2396 "cloudflare_challenge_final",
2397 cooldown_backoff,
2398 ),
2399 Some("cloudflare_timeout") => lb.penalize_with_backoff(
2400 selected.index,
2401 retry_opt.cloudflare_timeout_cooldown_secs,
2402 "cloudflare_timeout_final",
2403 cooldown_backoff,
2404 ),
2405 _ if status_code >= 400 => lb.penalize_with_backoff(
2406 selected.index,
2407 retry_opt.transport_cooldown_secs,
2408 &format!("status_{}_final", status_code),
2409 cooldown_backoff,
2410 ),
2411 _ => {}
2412 }
2413 }
2414
2415 let retryable = !status.is_success()
2416 && attempt_index + 1 < retry_opt.max_attempts
2417 && !never_retry
2418 && (should_retry_status(&retry_opt, status_code)
2419 || should_retry_class(&retry_opt, cls.as_deref()));
2420 if retryable {
2421 log_retry_trace(serde_json::json!({
2422 "event": "attempt_retryable_failure",
2423 "service": proxy.service_name,
2424 "request_id": request_id,
2425 "attempt": attempt_index + 1,
2426 "next_attempt": attempt_index + 2,
2427 "max_attempts": retry_opt.max_attempts,
2428 "status_code": status_code,
2429 "class": cls.as_deref(),
2430 "hint": hint.as_deref(),
2431 "cf_ray": cf_ray.as_deref(),
2432 "config_name": selected.config_name.as_str(),
2433 "upstream_index": selected.index,
2434 "upstream_base_url": selected.upstream.base_url.as_str(),
2435 "provider_id": provider_id.as_deref(),
2436 "strategy": if retry_failover { "failover" } else { "same_upstream" },
2437 "retry_after": resp_headers.get("retry-after").and_then(|v| v.to_str().ok()),
2438 "should_retry_status": should_retry_status(&retry_opt, status_code),
2439 "should_retry_class": should_retry_class(&retry_opt, cls.as_deref()),
2440 "never_retry_status": should_never_retry_status(&retry_opt, status_code),
2441 "never_retry_class": should_never_retry_class(&retry_opt, cls.as_deref()),
2442 }));
2443 let cls_s = cls.as_deref().unwrap_or("-");
2444 info!(
2445 "retrying after non-2xx status {} (class={}) for {} {} (config: {}, mode={}, next_attempt={}/{})",
2446 status_code,
2447 cls_s,
2448 method,
2449 uri.path(),
2450 selected.config_name,
2451 if retry_failover {
2452 "failover"
2453 } else {
2454 "same_upstream"
2455 },
2456 attempt_index + 2,
2457 retry_opt.max_attempts
2458 );
2459 if retry_failover {
2460 if status_code >= 500 || cls.is_some() {
2462 lb.record_result_with_backoff(
2463 selected.index,
2464 false,
2465 crate::lb::COOLDOWN_SECS,
2466 cooldown_backoff,
2467 );
2468 }
2469 match cls.as_deref() {
2470 Some("cloudflare_challenge") => lb.penalize_with_backoff(
2471 selected.index,
2472 retry_opt.cloudflare_challenge_cooldown_secs,
2473 "cloudflare_challenge",
2474 cooldown_backoff,
2475 ),
2476 Some("cloudflare_timeout") => lb.penalize_with_backoff(
2477 selected.index,
2478 retry_opt.cloudflare_timeout_cooldown_secs,
2479 "cloudflare_timeout",
2480 cooldown_backoff,
2481 ),
2482 _ if status_code >= 400 => lb.penalize_with_backoff(
2483 selected.index,
2484 retry_opt.transport_cooldown_secs,
2485 &format!("status_{}", status_code),
2486 cooldown_backoff,
2487 ),
2488 _ => {}
2489 }
2490 avoid
2491 .entry(selected.config_name.clone())
2492 .or_default()
2493 .insert(selected.index);
2494 }
2495 let skip_sleep = status_code >= 400 && status_code < 500 && status_code != 429;
2496 if !skip_sleep {
2497 retry_sleep(&retry_opt, attempt_index, &resp_headers).await;
2498 }
2499 continue;
2500 }
2501
2502 if success {
2508 lb.record_result_with_backoff(
2509 selected.index,
2510 true,
2511 crate::lb::COOLDOWN_SECS,
2512 cooldown_backoff,
2513 );
2514 } else if status_code >= 500 || cls.is_some() {
2515 lb.record_result_with_backoff(
2516 selected.index,
2517 false,
2518 crate::lb::COOLDOWN_SECS,
2519 cooldown_backoff,
2520 );
2521 }
2522
2523 let retry = retry_info_for_chain(&upstream_chain);
2524
2525 if is_user_turn {
2526 let provider_id = selected
2527 .upstream
2528 .tags
2529 .get("provider_id")
2530 .map(|s| s.as_str())
2531 .unwrap_or("-");
2532 info!(
2533 "user turn {} {} using config '{}' upstream[{}] provider_id='{}' base_url='{}'",
2534 method,
2535 uri.path(),
2536 selected.config_name,
2537 selected.index,
2538 provider_id,
2539 selected.upstream.base_url
2540 );
2541 }
2542
2543 let http_debug_warn = if should_include_http_warn(status_code)
2544 && let Some(b) = debug_base.as_ref()
2545 {
2546 let max = b.warn_max_body_bytes;
2547 let resp_ct = resp_headers
2548 .get("content-type")
2549 .and_then(|v| v.to_str().ok());
2550 Some(HttpDebugLog {
2551 request_body_len: Some(b.request_body_len),
2552 upstream_request_body_len: Some(b.upstream_request_body_len),
2553 upstream_headers_ms: Some(upstream_headers_ms),
2554 upstream_first_chunk_ms: None,
2555 upstream_body_read_ms: Some(upstream_body_read_ms),
2556 upstream_error_class: cls.clone(),
2557 upstream_error_hint: hint.clone(),
2558 upstream_cf_ray: cf_ray.clone(),
2559 client_uri: b.client_uri.clone(),
2560 target_url: b.target_url.clone(),
2561 client_headers: b.client_headers.clone(),
2562 upstream_request_headers: b.upstream_request_headers.clone(),
2563 auth_resolution: b.auth_resolution.clone(),
2564 client_body: b.client_body_warn.clone(),
2565 upstream_request_body: b.upstream_request_body_warn.clone(),
2566 upstream_response_headers: Some(header_map_to_entries(&resp_headers)),
2567 upstream_response_body: Some(make_body_preview(
2568 bytes.as_ref(),
2569 resp_ct,
2570 max,
2571 )),
2572 upstream_error: None,
2573 })
2574 } else {
2575 None
2576 };
2577
2578 if !status.is_success() {
2579 if let Some(h) = http_debug_warn.as_ref() {
2580 warn_http_debug(status_code, h);
2581 } else {
2582 let cls_s = cls.as_deref().unwrap_or("-");
2583 let cf_ray_s = cf_ray.as_deref().unwrap_or("-");
2584 warn!(
2585 "upstream returned non-2xx status {} (class={}, cf_ray={}) for {} {} (config: {}); set CODEX_HELPER_HTTP_WARN=0 to disable preview logs (or CODEX_HELPER_HTTP_DEBUG=1 for full debug)",
2586 status_code,
2587 cls_s,
2588 cf_ray_s,
2589 method,
2590 uri.path(),
2591 selected.config_name
2592 );
2593 }
2594 }
2595
2596 let http_debug = if should_include_http_debug(status_code) {
2597 debug_base.map(|b| {
2598 let max = b.debug_max_body_bytes;
2599 let resp_ct = resp_headers
2600 .get("content-type")
2601 .and_then(|v| v.to_str().ok());
2602 HttpDebugLog {
2603 request_body_len: Some(b.request_body_len),
2604 upstream_request_body_len: Some(b.upstream_request_body_len),
2605 upstream_headers_ms: Some(upstream_headers_ms),
2606 upstream_first_chunk_ms: None,
2607 upstream_body_read_ms: Some(upstream_body_read_ms),
2608 upstream_error_class: cls,
2609 upstream_error_hint: hint,
2610 upstream_cf_ray: cf_ray,
2611 client_uri: b.client_uri,
2612 target_url: b.target_url,
2613 client_headers: b.client_headers,
2614 upstream_request_headers: b.upstream_request_headers,
2615 auth_resolution: b.auth_resolution,
2616 client_body: b.client_body_debug,
2617 upstream_request_body: b.upstream_request_body_debug,
2618 upstream_response_headers: Some(header_map_to_entries(&resp_headers)),
2619 upstream_response_body: Some(make_body_preview(
2620 bytes.as_ref(),
2621 resp_ct,
2622 max,
2623 )),
2624 upstream_error: None,
2625 }
2626 })
2627 } else if should_include_http_warn(status_code) {
2628 http_debug_warn.clone()
2629 } else {
2630 None
2631 };
2632
2633 log_request_with_debug(
2634 proxy.service_name,
2635 method.as_str(),
2636 uri.path(),
2637 status_code,
2638 dur,
2639 Some(upstream_headers_ms),
2640 &selected.config_name,
2641 selected.upstream.tags.get("provider_id").cloned(),
2642 &selected.upstream.base_url,
2643 session_id.clone(),
2644 cwd.clone(),
2645 effective_effort.clone(),
2646 usage.clone(),
2647 retry.clone(),
2648 http_debug,
2649 );
2650 proxy
2651 .state
2652 .finish_request(crate::state::FinishRequestParams {
2653 id: request_id,
2654 status_code,
2655 duration_ms: dur,
2656 ended_at_ms: started_at_ms + dur,
2657 usage: usage.clone(),
2658 retry,
2659 ttfb_ms: Some(upstream_headers_ms),
2660 })
2661 .await;
2662
2663 if is_user_turn && is_codex_service {
2665 usage_providers::poll_for_codex_upstream(
2666 cfg_snapshot.clone(),
2667 proxy.lb_states.clone(),
2668 &selected.config_name,
2669 selected.index,
2670 )
2671 .await;
2672 }
2673
2674 let mut builder = Response::builder().status(status);
2675 for (name, value) in resp_headers_filtered.iter() {
2676 builder = builder.header(name, value);
2677 }
2678 return Ok(builder.body(Body::from(bytes)).unwrap());
2679 }
2680 }
2681
2682 let dur = start.elapsed().as_millis() as u64;
2683 let status = StatusCode::BAD_GATEWAY;
2684 let http_debug = if should_include_http_warn(status.as_u16()) {
2685 let client_headers_entries = client_headers_entries_cache
2686 .get_or_init(|| header_map_to_entries(&client_headers))
2687 .clone();
2688 Some(HttpDebugLog {
2689 request_body_len: Some(request_body_len),
2690 upstream_request_body_len: None,
2691 upstream_headers_ms: None,
2692 upstream_first_chunk_ms: None,
2693 upstream_body_read_ms: None,
2694 upstream_error_class: Some("retry_exhausted".to_string()),
2695 upstream_error_hint: Some("所有重试尝试均未能返回可用响应。".to_string()),
2696 upstream_cf_ray: None,
2697 client_uri: uri.to_string(),
2698 target_url: "-".to_string(),
2699 client_headers: client_headers_entries,
2700 upstream_request_headers: Vec::new(),
2701 auth_resolution: None,
2702 client_body: client_body_warn.clone(),
2703 upstream_request_body: None,
2704 upstream_response_headers: None,
2705 upstream_response_body: None,
2706 upstream_error: Some(format!(
2707 "retry attempts exhausted; chain={:?}",
2708 upstream_chain
2709 )),
2710 })
2711 } else {
2712 None
2713 };
2714 log_request_with_debug(
2715 proxy.service_name,
2716 method.as_str(),
2717 uri.path(),
2718 status.as_u16(),
2719 dur,
2720 None,
2721 "-",
2722 None,
2723 "-",
2724 session_id.clone(),
2725 cwd.clone(),
2726 effective_effort.clone(),
2727 None,
2728 retry_info_for_chain(&upstream_chain),
2729 http_debug,
2730 );
2731 let retry = retry_info_for_chain(&upstream_chain);
2732 proxy
2733 .state
2734 .finish_request(crate::state::FinishRequestParams {
2735 id: request_id,
2736 status_code: status.as_u16(),
2737 duration_ms: dur,
2738 ended_at_ms: started_at_ms + dur,
2739 usage: None,
2740 retry,
2741 ttfb_ms: None,
2742 })
2743 .await;
2744 Err((status, "retry attempts exhausted".to_string()))
2745 }
2746}
2747
2748pub fn router(proxy: ProxyService) -> Router {
2749 #[derive(serde::Deserialize)]
2751 struct SessionOverrideRequest {
2752 session_id: String,
2753 effort: Option<String>,
2754 }
2755
2756 #[derive(serde::Deserialize)]
2757 struct SessionConfigOverrideRequest {
2758 session_id: String,
2759 config_name: Option<String>,
2760 }
2761
2762 #[derive(serde::Deserialize)]
2763 struct GlobalConfigOverrideRequest {
2764 config_name: Option<String>,
2765 }
2766
2767 #[derive(serde::Serialize)]
2768 struct RuntimeConfigStatus {
2769 config_path: String,
2770 loaded_at_ms: u64,
2771 source_mtime_ms: Option<u64>,
2772 retry: crate::config::ResolvedRetryConfig,
2773 }
2774
2775 #[derive(serde::Serialize)]
2776 struct ApiCapabilities {
2777 api_version: u32,
2778 service_name: &'static str,
2779 endpoints: Vec<&'static str>,
2780 }
2781
2782 #[derive(serde::Serialize)]
2783 struct ReloadResult {
2784 reloaded: bool,
2785 status: RuntimeConfigStatus,
2786 }
2787
2788 async fn runtime_config_status(
2789 proxy: ProxyService,
2790 ) -> Result<Json<RuntimeConfigStatus>, (StatusCode, String)> {
2791 let cfg = proxy.config.snapshot().await;
2792 Ok(Json(RuntimeConfigStatus {
2793 config_path: crate::config::config_file_path().display().to_string(),
2794 loaded_at_ms: proxy.config.last_loaded_at_ms(),
2795 source_mtime_ms: proxy.config.last_mtime_ms().await,
2796 retry: cfg.retry.resolve(),
2797 }))
2798 }
2799
2800 async fn reload_runtime_config(
2801 proxy: ProxyService,
2802 ) -> Result<Json<ReloadResult>, (StatusCode, String)> {
2803 let changed = proxy
2804 .config
2805 .force_reload_from_disk()
2806 .await
2807 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2808 let cfg = proxy.config.snapshot().await;
2809 Ok(Json(ReloadResult {
2810 reloaded: changed,
2811 status: RuntimeConfigStatus {
2812 config_path: crate::config::config_file_path().display().to_string(),
2813 loaded_at_ms: proxy.config.last_loaded_at_ms(),
2814 source_mtime_ms: proxy.config.last_mtime_ms().await,
2815 retry: cfg.retry.resolve(),
2816 },
2817 }))
2818 }
2819
2820 async fn set_session_override(
2821 proxy: ProxyService,
2822 Json(payload): Json<SessionOverrideRequest>,
2823 ) -> Result<StatusCode, (StatusCode, String)> {
2824 if payload.session_id.trim().is_empty() {
2825 return Err((
2826 StatusCode::BAD_REQUEST,
2827 "session_id is required".to_string(),
2828 ));
2829 }
2830 if let Some(effort) = payload.effort {
2831 if effort.trim().is_empty() {
2832 return Err((StatusCode::BAD_REQUEST, "effort is empty".to_string()));
2833 }
2834 proxy
2835 .state
2836 .set_session_effort_override(
2837 payload.session_id,
2838 effort,
2839 std::time::SystemTime::now()
2840 .duration_since(std::time::UNIX_EPOCH)
2841 .map(|d| d.as_millis() as u64)
2842 .unwrap_or(0),
2843 )
2844 .await;
2845 } else {
2846 proxy
2847 .state
2848 .clear_session_effort_override(payload.session_id.as_str())
2849 .await;
2850 }
2851 Ok(StatusCode::NO_CONTENT)
2852 }
2853
2854 async fn list_session_overrides(
2855 proxy: ProxyService,
2856 ) -> Result<Json<std::collections::HashMap<String, String>>, (StatusCode, String)> {
2857 let map = proxy.state.list_session_effort_overrides().await;
2858 Ok(Json(map))
2859 }
2860
2861 async fn set_session_config_override(
2862 proxy: ProxyService,
2863 Json(payload): Json<SessionConfigOverrideRequest>,
2864 ) -> Result<StatusCode, (StatusCode, String)> {
2865 if payload.session_id.trim().is_empty() {
2866 return Err((
2867 StatusCode::BAD_REQUEST,
2868 "session_id is required".to_string(),
2869 ));
2870 }
2871 if let Some(config_name) = payload.config_name {
2872 if config_name.trim().is_empty() {
2873 return Err((StatusCode::BAD_REQUEST, "config_name is empty".to_string()));
2874 }
2875 proxy
2876 .state
2877 .set_session_config_override(
2878 payload.session_id,
2879 config_name,
2880 std::time::SystemTime::now()
2881 .duration_since(std::time::UNIX_EPOCH)
2882 .map(|d| d.as_millis() as u64)
2883 .unwrap_or(0),
2884 )
2885 .await;
2886 } else {
2887 proxy
2888 .state
2889 .clear_session_config_override(payload.session_id.as_str())
2890 .await;
2891 }
2892 Ok(StatusCode::NO_CONTENT)
2893 }
2894
2895 async fn list_session_config_overrides(
2896 proxy: ProxyService,
2897 ) -> Result<Json<std::collections::HashMap<String, String>>, (StatusCode, String)> {
2898 let map = proxy.state.list_session_config_overrides().await;
2899 Ok(Json(map))
2900 }
2901
2902 async fn get_global_config_override(
2903 proxy: ProxyService,
2904 ) -> Result<Json<Option<String>>, (StatusCode, String)> {
2905 Ok(Json(proxy.state.get_global_config_override().await))
2906 }
2907
2908 async fn set_global_config_override(
2909 proxy: ProxyService,
2910 Json(payload): Json<GlobalConfigOverrideRequest>,
2911 ) -> Result<StatusCode, (StatusCode, String)> {
2912 if let Some(config_name) = payload.config_name {
2913 if config_name.trim().is_empty() {
2914 return Err((StatusCode::BAD_REQUEST, "config_name is empty".to_string()));
2915 }
2916 proxy
2917 .state
2918 .set_global_config_override(
2919 config_name,
2920 std::time::SystemTime::now()
2921 .duration_since(std::time::UNIX_EPOCH)
2922 .map(|d| d.as_millis() as u64)
2923 .unwrap_or(0),
2924 )
2925 .await;
2926 } else {
2927 proxy.state.clear_global_config_override().await;
2928 }
2929 Ok(StatusCode::NO_CONTENT)
2930 }
2931
2932 async fn list_active_requests(
2933 proxy: ProxyService,
2934 ) -> Result<Json<Vec<ActiveRequest>>, (StatusCode, String)> {
2935 let vec = proxy.state.list_active_requests().await;
2936 Ok(Json(vec))
2937 }
2938
2939 async fn list_session_stats(
2940 proxy: ProxyService,
2941 ) -> Result<
2942 Json<std::collections::HashMap<String, crate::state::SessionStats>>,
2943 (StatusCode, String),
2944 > {
2945 let map = proxy.state.list_session_stats().await;
2946 Ok(Json(map))
2947 }
2948
2949 #[derive(serde::Deserialize)]
2950 struct RecentQuery {
2951 limit: Option<usize>,
2952 }
2953
2954 async fn list_recent_finished(
2955 proxy: ProxyService,
2956 Query(q): Query<RecentQuery>,
2957 ) -> Result<Json<Vec<FinishedRequest>>, (StatusCode, String)> {
2958 let limit = q.limit.unwrap_or(50).clamp(1, 200);
2959 let vec = proxy.state.list_recent_finished(limit).await;
2960 Ok(Json(vec))
2961 }
2962
2963 async fn api_capabilities(
2964 proxy: ProxyService,
2965 ) -> Result<Json<ApiCapabilities>, (StatusCode, String)> {
2966 Ok(Json(ApiCapabilities {
2967 api_version: 1,
2968 service_name: proxy.service_name,
2969 endpoints: vec![
2970 "/__codex_helper/api/v1/capabilities",
2971 "/__codex_helper/api/v1/snapshot",
2972 "/__codex_helper/api/v1/status/active",
2973 "/__codex_helper/api/v1/status/recent",
2974 "/__codex_helper/api/v1/status/session-stats",
2975 "/__codex_helper/api/v1/status/health-checks",
2976 "/__codex_helper/api/v1/status/config-health",
2977 "/__codex_helper/api/v1/config/runtime",
2978 "/__codex_helper/api/v1/config/reload",
2979 "/__codex_helper/api/v1/configs",
2980 "/__codex_helper/api/v1/overrides/session/effort",
2981 "/__codex_helper/api/v1/overrides/session/config",
2982 "/__codex_helper/api/v1/overrides/global-config",
2983 "/__codex_helper/api/v1/healthcheck/start",
2984 "/__codex_helper/api/v1/healthcheck/cancel",
2985 ],
2986 }))
2987 }
2988
2989 #[derive(serde::Deserialize)]
2990 struct SnapshotQuery {
2991 recent_limit: Option<usize>,
2992 stats_days: Option<usize>,
2993 }
2994
2995 async fn api_v1_snapshot(
2996 proxy: ProxyService,
2997 Query(q): Query<SnapshotQuery>,
2998 ) -> Result<Json<crate::dashboard_core::ApiV1Snapshot>, (StatusCode, String)> {
2999 let recent_limit = q.recent_limit.unwrap_or(200).clamp(1, 2_000);
3000 let stats_days = q.stats_days.unwrap_or(21).clamp(1, 365);
3001
3002 let cfg = proxy.config.snapshot().await;
3003 let mgr = match proxy.service_name {
3004 "claude" => &cfg.claude,
3005 _ => &cfg.codex,
3006 };
3007 let mut configs = mgr
3008 .configs
3009 .iter()
3010 .map(|(name, c)| crate::dashboard_core::ConfigOption {
3011 name: name.clone(),
3012 alias: c.alias.clone(),
3013 enabled: c.enabled,
3014 level: c.level.clamp(1, 10),
3015 })
3016 .collect::<Vec<_>>();
3017 configs.sort_by(|a, b| a.level.cmp(&b.level).then_with(|| a.name.cmp(&b.name)));
3018
3019 let snapshot = crate::dashboard_core::build_dashboard_snapshot(
3020 &proxy.state,
3021 proxy.service_name,
3022 recent_limit,
3023 stats_days,
3024 )
3025 .await;
3026
3027 Ok(Json(crate::dashboard_core::ApiV1Snapshot {
3028 api_version: 1,
3029 service_name: proxy.service_name.to_string(),
3030 runtime_loaded_at_ms: Some(proxy.config.last_loaded_at_ms()),
3031 runtime_source_mtime_ms: proxy.config.last_mtime_ms().await,
3032 configs,
3033 snapshot,
3034 }))
3035 }
3036
3037 async fn list_health_checks(
3038 proxy: ProxyService,
3039 ) -> Result<
3040 Json<std::collections::HashMap<String, crate::state::HealthCheckStatus>>,
3041 (StatusCode, String),
3042 > {
3043 let map = proxy.state.list_health_checks(proxy.service_name).await;
3044 Ok(Json(map))
3045 }
3046
3047 async fn list_config_health(
3048 proxy: ProxyService,
3049 ) -> Result<
3050 Json<std::collections::HashMap<String, crate::state::ConfigHealth>>,
3051 (StatusCode, String),
3052 > {
3053 let map = proxy.state.get_config_health(proxy.service_name).await;
3054 Ok(Json(map))
3055 }
3056
3057 #[derive(Debug, serde::Deserialize)]
3058 struct HealthCheckAction {
3059 #[serde(default)]
3060 all: bool,
3061 #[serde(default)]
3062 config_names: Vec<String>,
3063 }
3064
3065 #[derive(Debug, serde::Serialize)]
3066 struct HealthCheckActionResult {
3067 started: Vec<String>,
3068 already_running: Vec<String>,
3069 missing: Vec<String>,
3070 cancel_requested: Vec<String>,
3071 not_running: Vec<String>,
3072 }
3073
3074 fn now_ms() -> u64 {
3075 std::time::SystemTime::now()
3076 .duration_since(std::time::UNIX_EPOCH)
3077 .map(|d| d.as_millis() as u64)
3078 .unwrap_or(0)
3079 }
3080
3081 async fn start_health_checks(
3082 proxy: ProxyService,
3083 Json(payload): Json<HealthCheckAction>,
3084 ) -> Result<Json<HealthCheckActionResult>, (StatusCode, String)> {
3085 let cfg = proxy.config.snapshot().await;
3086 let mgr = match proxy.service_name {
3087 "claude" => &cfg.claude,
3088 _ => &cfg.codex,
3089 };
3090
3091 let mut targets = if payload.all {
3092 mgr.configs.keys().cloned().collect::<Vec<_>>()
3093 } else {
3094 payload.config_names
3095 };
3096 targets.retain(|s| !s.trim().is_empty());
3097 targets.sort();
3098 targets.dedup();
3099 if targets.is_empty() {
3100 return Err((
3101 StatusCode::BAD_REQUEST,
3102 "expected { all: true } or non-empty config_names".to_string(),
3103 ));
3104 }
3105
3106 let mut started = Vec::new();
3107 let mut already_running = Vec::new();
3108 let mut missing = Vec::new();
3109 for name in targets {
3110 let Some(svc) = mgr.configs.get(&name) else {
3111 missing.push(name);
3112 continue;
3113 };
3114
3115 let upstreams = svc.upstreams.clone();
3116 let now = now_ms();
3117 if !proxy
3118 .state
3119 .try_begin_health_check(proxy.service_name, &name, upstreams.len(), now)
3120 .await
3121 {
3122 already_running.push(name);
3123 continue;
3124 }
3125
3126 proxy
3127 .state
3128 .record_config_health(
3129 proxy.service_name,
3130 name.clone(),
3131 crate::state::ConfigHealth {
3132 checked_at_ms: now,
3133 upstreams: Vec::new(),
3134 },
3135 )
3136 .await;
3137
3138 let state = proxy.state.clone();
3139 let service_name = proxy.service_name;
3140 let config_name = name.clone();
3141 tokio::spawn(async move {
3142 crate::healthcheck::run_health_check_for_config(
3143 state,
3144 service_name,
3145 config_name,
3146 upstreams,
3147 )
3148 .await;
3149 });
3150 started.push(name);
3151 }
3152
3153 Ok(Json(HealthCheckActionResult {
3154 started,
3155 already_running,
3156 missing,
3157 cancel_requested: Vec::new(),
3158 not_running: Vec::new(),
3159 }))
3160 }
3161
3162 async fn cancel_health_checks(
3163 proxy: ProxyService,
3164 Json(payload): Json<HealthCheckAction>,
3165 ) -> Result<Json<HealthCheckActionResult>, (StatusCode, String)> {
3166 let cfg = proxy.config.snapshot().await;
3167 let mgr = match proxy.service_name {
3168 "claude" => &cfg.claude,
3169 _ => &cfg.codex,
3170 };
3171
3172 let mut targets = if payload.all {
3173 mgr.configs.keys().cloned().collect::<Vec<_>>()
3174 } else {
3175 payload.config_names
3176 };
3177 targets.retain(|s| !s.trim().is_empty());
3178 targets.sort();
3179 targets.dedup();
3180 if targets.is_empty() {
3181 return Err((
3182 StatusCode::BAD_REQUEST,
3183 "expected { all: true } or non-empty config_names".to_string(),
3184 ));
3185 }
3186
3187 let now = now_ms();
3188 let mut cancel_requested = Vec::new();
3189 let mut not_running = Vec::new();
3190 let mut missing = Vec::new();
3191 for name in targets {
3192 if !mgr.configs.contains_key(&name) {
3193 missing.push(name);
3194 continue;
3195 }
3196 let ok = proxy
3197 .state
3198 .request_cancel_health_check(proxy.service_name, &name, now)
3199 .await;
3200 if ok {
3201 cancel_requested.push(name);
3202 } else {
3203 not_running.push(name);
3204 }
3205 }
3206
3207 Ok(Json(HealthCheckActionResult {
3208 started: Vec::new(),
3209 already_running: Vec::new(),
3210 missing,
3211 cancel_requested,
3212 not_running,
3213 }))
3214 }
3215
3216 async fn list_configs(
3217 proxy: ProxyService,
3218 ) -> Result<Json<Vec<crate::dashboard_core::ConfigOption>>, (StatusCode, String)> {
3219 let cfg = proxy.config.snapshot().await;
3220 let mgr = match proxy.service_name {
3221 "claude" => &cfg.claude,
3222 _ => &cfg.codex,
3223 };
3224 let mut out = mgr
3225 .configs
3226 .iter()
3227 .map(|(name, c)| crate::dashboard_core::ConfigOption {
3228 name: name.clone(),
3229 alias: c.alias.clone(),
3230 enabled: c.enabled,
3231 level: c.level.clamp(1, 10),
3232 })
3233 .collect::<Vec<_>>();
3234 out.sort_by(|a, b| a.level.cmp(&b.level).then_with(|| a.name.cmp(&b.name)));
3235 Ok(Json(out))
3236 }
3237
3238 let p0 = proxy.clone();
3239 let p1 = proxy.clone();
3240 let p2 = proxy.clone();
3241 let p3 = proxy.clone();
3242 let p4 = proxy.clone();
3243 let p5 = proxy.clone();
3244 let p6 = proxy.clone();
3245 let p7 = proxy.clone();
3246 let p8 = proxy.clone();
3247 let p9 = proxy.clone();
3248 let p10 = proxy.clone();
3249 let p11 = proxy.clone();
3250 let p12 = proxy.clone();
3251 let p13 = proxy.clone();
3252 let p14 = proxy.clone();
3253 let p15 = proxy.clone();
3254 let p16 = proxy.clone();
3255 let p17 = proxy.clone();
3256 let p18 = proxy.clone();
3257 let p19 = proxy.clone();
3258 let p20 = proxy.clone();
3259 let p21 = proxy.clone();
3260 let p22 = proxy.clone();
3261 let p23 = proxy.clone();
3262 let p24 = proxy.clone();
3263 let p25 = proxy.clone();
3264
3265 Router::new()
3266 .route(
3268 "/__codex_helper/api/v1/capabilities",
3269 get(move || api_capabilities(p8.clone())),
3270 )
3271 .route(
3272 "/__codex_helper/api/v1/snapshot",
3273 get(move |q| api_v1_snapshot(p25.clone(), q)),
3274 )
3275 .route(
3276 "/__codex_helper/api/v1/status/active",
3277 get(move || list_active_requests(p9.clone())),
3278 )
3279 .route(
3280 "/__codex_helper/api/v1/status/recent",
3281 get(move |q| list_recent_finished(p10.clone(), q)),
3282 )
3283 .route(
3284 "/__codex_helper/api/v1/status/session-stats",
3285 get(move || list_session_stats(p11.clone())),
3286 )
3287 .route(
3288 "/__codex_helper/api/v1/status/health-checks",
3289 get(move || list_health_checks(p21.clone())),
3290 )
3291 .route(
3292 "/__codex_helper/api/v1/status/config-health",
3293 get(move || list_config_health(p22.clone())),
3294 )
3295 .route(
3296 "/__codex_helper/api/v1/config/runtime",
3297 get(move || runtime_config_status(p12.clone())),
3298 )
3299 .route(
3300 "/__codex_helper/api/v1/config/reload",
3301 post(move || reload_runtime_config(p13.clone())),
3302 )
3303 .route(
3304 "/__codex_helper/api/v1/configs",
3305 get(move || list_configs(p14.clone())),
3306 )
3307 .route(
3308 "/__codex_helper/api/v1/overrides/session/effort",
3309 get(move || list_session_overrides(p15.clone()))
3310 .post(move |payload| set_session_override(p16.clone(), payload)),
3311 )
3312 .route(
3313 "/__codex_helper/api/v1/overrides/session/config",
3314 get(move || list_session_config_overrides(p17.clone()))
3315 .post(move |payload| set_session_config_override(p18.clone(), payload)),
3316 )
3317 .route(
3318 "/__codex_helper/api/v1/overrides/global-config",
3319 get(move || get_global_config_override(p19.clone()))
3320 .post(move |payload| set_global_config_override(p20.clone(), payload)),
3321 )
3322 .route(
3323 "/__codex_helper/api/v1/healthcheck/start",
3324 post(move |payload| start_health_checks(p23.clone(), payload)),
3325 )
3326 .route(
3327 "/__codex_helper/api/v1/healthcheck/cancel",
3328 post(move |payload| cancel_health_checks(p24.clone(), payload)),
3329 )
3330 .route(
3331 "/__codex_helper/override/session",
3332 get(move || list_session_overrides(p0.clone()))
3333 .post(move |payload| set_session_override(p1.clone(), payload)),
3334 )
3335 .route(
3336 "/__codex_helper/config/runtime",
3337 get(move || runtime_config_status(p5.clone())),
3338 )
3339 .route(
3340 "/__codex_helper/config/reload",
3341 get(move || runtime_config_status(p6.clone()))
3342 .post(move || reload_runtime_config(p7.clone())),
3343 )
3344 .route(
3345 "/__codex_helper/status/active",
3346 get(move || list_active_requests(p3.clone())),
3347 )
3348 .route(
3349 "/__codex_helper/status/recent",
3350 get(move |q| list_recent_finished(p4.clone(), q)),
3351 )
3352 .route("/{*path}", any(move |req| handle_proxy(p2.clone(), req)))
3353}