1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use reqwest::Client;
5
6use crate::config::{
7 PersistedProviderSpec, PersistedProvidersCatalog, PersistedRoutingSpec, ProxyConfig,
8 ProxyConfigV4, ServiceConfigManager,
9};
10use crate::filter::RequestFilter;
11use crate::lb::LbState;
12use crate::routing_explain::RoutingExplainResponse;
13use crate::routing_ir::RouteRequestContext;
14use crate::state::{ProxyState, SessionBinding, SessionContinuityMode};
15
16use super::profile_defaults::effective_default_profile_name;
17use super::{
18 PersistedRoutingUpsertRequest, ProfilesResponse, ProviderBalanceRefreshResponse,
19 ProxyControlError, ProxyService, ReloadResult, RuntimeConfig, RuntimeStatusResponse,
20};
21
22impl ProxyService {
23 pub fn new(
24 client: Client,
25 config: Arc<ProxyConfig>,
26 service_name: &'static str,
27 lb_states: Arc<Mutex<HashMap<String, LbState>>>,
28 ) -> Self {
29 Self::new_with_v4_source(client, config, None, service_name, lb_states)
30 }
31
32 pub fn new_with_v4_source(
33 client: Client,
34 config: Arc<ProxyConfig>,
35 v4_source: Option<Arc<ProxyConfigV4>>,
36 service_name: &'static str,
37 lb_states: Arc<Mutex<HashMap<String, LbState>>>,
38 ) -> Self {
39 let state = ProxyState::new_with_lb_states(Some(lb_states.clone()));
40 ProxyState::spawn_cleanup_task(state.clone());
41 if !cfg!(test) {
42 let state = state.clone();
43 let log_path = crate::logging::request_log_path();
44 let mut base_url_to_provider_id = HashMap::new();
45 let mgr = match service_name {
46 "claude" => &config.claude,
47 _ => &config.codex,
48 };
49 for svc in mgr.stations().values() {
50 for up in &svc.upstreams {
51 if let Some(pid) = up.tags.get("provider_id") {
52 base_url_to_provider_id.insert(up.base_url.clone(), pid.clone());
53 }
54 }
55 }
56 tokio::spawn(async move {
57 let _ = state
58 .replay_usage_from_requests_log(service_name, log_path, base_url_to_provider_id)
59 .await;
60 });
61 }
62 Self {
63 client,
64 config: Arc::new(RuntimeConfig::new_with_v4(config, v4_source)),
65 service_name,
66 lb_states,
67 filter: RequestFilter::new(),
68 state,
69 }
70 }
71
72 pub(super) fn service_manager<'a>(&self, cfg: &'a ProxyConfig) -> &'a ServiceConfigManager {
73 match self.service_name {
74 "codex" => &cfg.codex,
75 "claude" => &cfg.claude,
76 _ => &cfg.codex,
77 }
78 }
79
80 pub(super) async fn ensure_default_session_binding(
81 &self,
82 mgr: &ServiceConfigManager,
83 session_id: &str,
84 now_ms: u64,
85 ) -> Option<SessionBinding> {
86 if let Some(binding) = self.state.get_session_binding(session_id).await {
87 self.state.touch_session_binding(session_id, now_ms).await;
88 return Some(binding);
89 }
90
91 let profile_name =
92 effective_default_profile_name(self.state.as_ref(), self.service_name, mgr).await?;
93 let profile = crate::config::resolve_service_profile(mgr, profile_name.as_str()).ok()?;
94 let binding = SessionBinding {
95 session_id: session_id.to_string(),
96 profile_name: Some(profile_name),
97 station_name: profile.station.clone(),
98 model: profile.model.clone(),
99 reasoning_effort: profile.reasoning_effort.clone(),
100 service_tier: profile.service_tier.clone(),
101 continuity_mode: SessionContinuityMode::DefaultProfile,
102 created_at_ms: now_ms,
103 updated_at_ms: now_ms,
104 last_seen_ms: now_ms,
105 };
106 self.state.set_session_binding(binding.clone()).await;
107 Some(binding)
108 }
109
110 pub fn state_handle(&self) -> Arc<ProxyState> {
111 self.state.clone()
112 }
113
114 pub fn spawn_initial_balance_refresh(&self) {
115 if cfg!(test) {
116 return;
117 }
118
119 let proxy = self.clone();
120 tokio::spawn(async move {
121 match super::providers_api::refresh_provider_balances_for_proxy(&proxy, None, None)
122 .await
123 {
124 Ok(summary) => {
125 tracing::info!(
126 "initial provider balance refresh finished: attempted={}, refreshed={}, failed={}, missing_token={}, auto_refreshed={}",
127 summary.attempted,
128 summary.refreshed,
129 summary.failed,
130 summary.missing_token,
131 summary.auto_refreshed
132 );
133 }
134 Err((status, message)) => {
135 tracing::warn!(
136 "initial provider balance refresh failed before polling: status={}, {}",
137 status,
138 message
139 );
140 }
141 }
142 });
143 }
144
145 pub async fn refresh_provider_balances(
146 &self,
147 station_name_filter: Option<&str>,
148 provider_id_filter: Option<&str>,
149 ) -> Result<ProviderBalanceRefreshResponse, ProxyControlError> {
150 let refresh = super::providers_api::refresh_provider_balances_for_proxy(
151 self,
152 station_name_filter,
153 provider_id_filter,
154 )
155 .await
156 .map_err(ProxyControlError::from)?;
157 let provider_balances = self
158 .state
159 .get_provider_balance_view(self.service_name)
160 .await;
161
162 Ok(ProviderBalanceRefreshResponse {
163 service_name: self.service_name.to_string(),
164 refresh,
165 provider_balances,
166 })
167 }
168
169 pub async fn runtime_status(&self) -> RuntimeStatusResponse {
170 super::api_responses::build_runtime_status_response(self).await
171 }
172
173 pub async fn reload_runtime_config(&self) -> Result<ReloadResult, ProxyControlError> {
174 let changed = self.config.force_reload_from_disk().await.map_err(|err| {
175 ProxyControlError::new(
176 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
177 err.to_string(),
178 )
179 })?;
180 if changed {
181 super::control_plane_service::prune_runtime_observability_after_reload(self).await;
182 }
183 let status = self.runtime_status().await;
184 Ok(super::api_responses::build_reload_result(changed, status))
185 }
186
187 pub async fn profiles(&self) -> ProfilesResponse {
188 super::api_responses::make_profiles_response(self).await
189 }
190
191 pub async fn set_runtime_default_profile(
192 &self,
193 profile_name: Option<String>,
194 ) -> Result<(), ProxyControlError> {
195 let profile_name = normalize_optional_control_name(profile_name);
196
197 if let Some(profile_name) = profile_name {
198 let cfg = self.config.snapshot().await;
199 let mgr = self.service_manager(cfg.as_ref());
200 if mgr.profile(profile_name.as_str()).is_none() {
201 return Err(ProxyControlError::new(
202 axum::http::StatusCode::NOT_FOUND,
203 format!("profile '{}' not found", profile_name),
204 ));
205 }
206 let resolved = crate::config::resolve_service_profile(mgr, profile_name.as_str())
207 .map_err(|err| {
208 ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
209 })?;
210 crate::config::validate_profile_station_compatibility(
211 self.service_name,
212 mgr,
213 profile_name.as_str(),
214 &resolved,
215 )
216 .map_err(|err| {
217 ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
218 })?;
219 self.state
220 .set_runtime_default_profile_override(
221 self.service_name.to_string(),
222 profile_name,
223 crate::logging::now_ms(),
224 )
225 .await;
226 } else {
227 self.state
228 .clear_runtime_default_profile_override(self.service_name)
229 .await;
230 }
231
232 Ok(())
233 }
234
235 pub async fn set_persisted_default_profile(
236 &self,
237 profile_name: Option<String>,
238 ) -> Result<ProfilesResponse, ProxyControlError> {
239 let profile_name = normalize_optional_control_name(profile_name);
240
241 if let super::control_plane_service::PersistedProxySettingsDocument::V4(mut document) =
242 super::control_plane_service::load_persisted_proxy_settings_document()
243 .await
244 .map_err(ProxyControlError::from)?
245 {
246 if let Some(profile_name) = profile_name.as_deref() {
247 let view =
248 super::control_plane_service::service_view_v4(&document, self.service_name);
249 if !view.profiles.contains_key(profile_name) {
250 return Err(ProxyControlError::new(
251 axum::http::StatusCode::NOT_FOUND,
252 format!("profile '{}' not found", profile_name),
253 ));
254 }
255 let runtime = crate::config::compile_v4_to_runtime(&document).map_err(|err| {
256 ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
257 })?;
258 let mgr = super::persisted_registry_api::runtime_service_manager_for_document(
259 &runtime,
260 self.service_name,
261 );
262 let resolved =
263 crate::config::resolve_service_profile(mgr, profile_name).map_err(|err| {
264 ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
265 })?;
266 crate::config::validate_profile_station_compatibility(
267 self.service_name,
268 mgr,
269 profile_name,
270 &resolved,
271 )
272 .map_err(|err| {
273 ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
274 })?;
275 }
276 let view =
277 super::control_plane_service::service_view_v4_mut(&mut document, self.service_name);
278 view.default_profile = profile_name;
279 super::control_plane_service::save_persisted_proxy_settings_document_and_reload(
280 self,
281 super::control_plane_service::PersistedProxySettingsDocument::V4(document),
282 )
283 .await
284 .map_err(ProxyControlError::from)?;
285 return Ok(self.profiles().await);
286 }
287
288 let cfg_snapshot = self.config.snapshot().await;
289 let mut cfg = cfg_snapshot.as_ref().clone();
290 let mgr =
291 super::control_plane_service::runtime_service_manager_mut(&mut cfg, self.service_name);
292
293 if let Some(profile_name) = profile_name.as_deref() {
294 if mgr.profile(profile_name).is_none() {
295 return Err(ProxyControlError::new(
296 axum::http::StatusCode::NOT_FOUND,
297 format!("profile '{}' not found", profile_name),
298 ));
299 }
300 let resolved =
301 crate::config::resolve_service_profile(mgr, profile_name).map_err(|err| {
302 ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
303 })?;
304 crate::config::validate_profile_station_compatibility(
305 self.service_name,
306 mgr,
307 profile_name,
308 &resolved,
309 )
310 .map_err(|err| {
311 ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
312 })?;
313 }
314 mgr.default_profile = profile_name;
315
316 super::control_plane_service::save_runtime_profile_settings_and_reload(self, cfg)
317 .await
318 .map_err(ProxyControlError::from)
319 }
320
321 pub async fn persisted_provider_specs(
322 &self,
323 ) -> Result<PersistedProvidersCatalog, ProxyControlError> {
324 let catalog = match super::control_plane_service::load_persisted_proxy_settings_document()
325 .await
326 .map_err(ProxyControlError::from)?
327 {
328 super::control_plane_service::PersistedProxySettingsDocument::V2(cfg) => {
329 crate::config::build_persisted_provider_catalog(
330 super::control_plane_service::service_view_v2(&cfg, self.service_name),
331 )
332 }
333 super::control_plane_service::PersistedProxySettingsDocument::V4(cfg) => {
334 PersistedProvidersCatalog {
335 providers: super::control_plane_service::service_view_v4(
336 &cfg,
337 self.service_name,
338 )
339 .providers
340 .iter()
341 .map(|(name, provider)| {
342 persisted_provider_spec_from_v4_for_service(name, provider)
343 })
344 .collect(),
345 }
346 }
347 };
348 Ok(catalog)
349 }
350
351 pub async fn upsert_persisted_provider_spec(
352 &self,
353 provider_name: String,
354 provider: PersistedProviderSpec,
355 ) -> Result<(), ProxyControlError> {
356 super::persisted_registry_api::upsert_persisted_provider_spec_for_proxy(
357 self,
358 provider_name,
359 provider.into(),
360 )
361 .await
362 .map(|_| ())
363 .map_err(ProxyControlError::from)
364 }
365
366 pub async fn persisted_routing_spec(&self) -> Result<PersistedRoutingSpec, ProxyControlError> {
367 match super::control_plane_service::load_persisted_proxy_settings_document()
368 .await
369 .map_err(ProxyControlError::from)?
370 {
371 super::control_plane_service::PersistedProxySettingsDocument::V4(cfg) => {
372 let view = super::control_plane_service::service_view_v4(&cfg, self.service_name);
373 Ok(persisted_routing_spec_from_v4_for_service(view))
374 }
375 super::control_plane_service::PersistedProxySettingsDocument::V2(_) => {
376 Err(ProxyControlError::new(
377 axum::http::StatusCode::BAD_REQUEST,
378 "routing API requires a version = 5 route graph config",
379 ))
380 }
381 }
382 }
383
384 pub async fn upsert_persisted_routing_spec(
385 &self,
386 payload: PersistedRoutingUpsertRequest,
387 ) -> Result<PersistedRoutingSpec, ProxyControlError> {
388 super::persisted_registry_api::upsert_persisted_routing_spec_for_proxy(self, payload)
389 .await
390 .map_err(ProxyControlError::from)
391 }
392
393 pub async fn routing_explain(
394 &self,
395 request: RouteRequestContext,
396 session_id: Option<String>,
397 ) -> Result<RoutingExplainResponse, ProxyControlError> {
398 super::runtime_admin_api::routing_explain_for_proxy(self, request, session_id)
399 .await
400 .map_err(ProxyControlError::from)
401 }
402}
403
404fn normalize_optional_control_name(value: Option<String>) -> Option<String> {
405 value
406 .as_deref()
407 .map(str::trim)
408 .filter(|value| !value.is_empty())
409 .map(ToOwned::to_owned)
410}
411
412fn persisted_provider_spec_from_v4_for_service(
413 name: &str,
414 provider: &crate::config::ProviderConfigV4,
415) -> PersistedProviderSpec {
416 let mut endpoints = Vec::new();
417 if let Some(base_url) = provider
418 .base_url
419 .as_deref()
420 .map(str::trim)
421 .filter(|value| !value.is_empty())
422 {
423 endpoints.push(crate::config::PersistedProviderEndpointSpec {
424 name: "default".to_string(),
425 base_url: base_url.to_string(),
426 enabled: provider.enabled,
427 priority: 0,
428 tags: std::collections::BTreeMap::new(),
429 });
430 }
431 endpoints.extend(provider.endpoints.iter().map(|(endpoint_name, endpoint)| {
432 crate::config::PersistedProviderEndpointSpec {
433 name: endpoint_name.clone(),
434 base_url: endpoint.base_url.clone(),
435 enabled: endpoint.enabled,
436 priority: endpoint.priority,
437 tags: endpoint.tags.clone(),
438 }
439 }));
440
441 PersistedProviderSpec {
442 name: name.to_string(),
443 alias: provider.alias.clone(),
444 enabled: provider.enabled,
445 auth_token_env: provider.inline_auth.auth_token_env.clone(),
446 api_key_env: provider.inline_auth.api_key_env.clone(),
447 tags: provider.tags.clone(),
448 endpoints,
449 }
450}
451
452fn persisted_routing_spec_from_v4_for_service(
453 view: &crate::config::ServiceViewV4,
454) -> PersistedRoutingSpec {
455 let routing = crate::config::effective_v4_routing(view);
456 let order = crate::config::resolved_v4_provider_order("persisted-routing", view)
457 .unwrap_or_else(|_| view.providers.keys().cloned().collect::<Vec<_>>());
458 let entry_node = routing.entry_node();
459 PersistedRoutingSpec {
460 entry: routing.entry.clone(),
461 affinity_policy: routing.affinity_policy,
462 fallback_ttl_ms: routing.fallback_ttl_ms,
463 reprobe_preferred_after_ms: routing.reprobe_preferred_after_ms,
464 routes: routing.routes.clone(),
465 policy: entry_node
466 .map(|node| node.strategy)
467 .unwrap_or(crate::config::RoutingPolicyV4::OrderedFailover),
468 order: order.clone(),
469 target: entry_node.and_then(|node| node.target.clone()),
470 prefer_tags: entry_node
471 .map(|node| node.prefer_tags.clone())
472 .unwrap_or_default(),
473 on_exhausted: entry_node
474 .map(|node| node.on_exhausted)
475 .unwrap_or(crate::config::RoutingExhaustedActionV4::Continue),
476 entry_strategy: entry_node
477 .map(|node| node.strategy)
478 .unwrap_or(crate::config::RoutingPolicyV4::OrderedFailover),
479 expanded_order: order,
480 entry_target: entry_node.and_then(|node| node.target.clone()),
481 providers: view
482 .providers
483 .iter()
484 .map(
485 |(name, provider)| crate::config::PersistedRoutingProviderRef {
486 name: name.clone(),
487 alias: provider.alias.clone(),
488 enabled: provider.enabled,
489 tags: provider.tags.clone(),
490 },
491 )
492 .collect(),
493 }
494}