Skip to main content

codex_helper_core/proxy/
service_core.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use reqwest::Client;
5
6use crate::config::{
7    PersistedProviderSpec, PersistedProvidersCatalog, PersistedRoutingSpec, ProxyConfig,
8    ProxyConfigV4, ServiceConfigManager,
9};
10use crate::filter::RequestFilter;
11use crate::lb::LbState;
12use crate::routing_explain::RoutingExplainResponse;
13use crate::routing_ir::RouteRequestContext;
14use crate::state::{ProxyState, SessionBinding, SessionContinuityMode};
15
16use super::profile_defaults::effective_default_profile_name;
17use super::{
18    PersistedRoutingUpsertRequest, ProfilesResponse, ProviderBalanceRefreshResponse,
19    ProxyControlError, ProxyService, ReloadResult, RuntimeConfig, RuntimeStatusResponse,
20};
21
22impl ProxyService {
23    pub fn new(
24        client: Client,
25        config: Arc<ProxyConfig>,
26        service_name: &'static str,
27        lb_states: Arc<Mutex<HashMap<String, LbState>>>,
28    ) -> Self {
29        Self::new_with_v4_source(client, config, None, service_name, lb_states)
30    }
31
32    pub fn new_with_v4_source(
33        client: Client,
34        config: Arc<ProxyConfig>,
35        v4_source: Option<Arc<ProxyConfigV4>>,
36        service_name: &'static str,
37        lb_states: Arc<Mutex<HashMap<String, LbState>>>,
38    ) -> Self {
39        let state = ProxyState::new_with_lb_states(Some(lb_states.clone()));
40        ProxyState::spawn_cleanup_task(state.clone());
41        if !cfg!(test) {
42            let state = state.clone();
43            let log_path = crate::logging::request_log_path();
44            let mut base_url_to_provider_id = HashMap::new();
45            let mgr = match service_name {
46                "claude" => &config.claude,
47                _ => &config.codex,
48            };
49            for svc in mgr.stations().values() {
50                for up in &svc.upstreams {
51                    if let Some(pid) = up.tags.get("provider_id") {
52                        base_url_to_provider_id.insert(up.base_url.clone(), pid.clone());
53                    }
54                }
55            }
56            tokio::spawn(async move {
57                let _ = state
58                    .replay_usage_from_requests_log(service_name, log_path, base_url_to_provider_id)
59                    .await;
60            });
61        }
62        Self {
63            client,
64            config: Arc::new(RuntimeConfig::new_with_v4(config, v4_source)),
65            service_name,
66            lb_states,
67            filter: RequestFilter::new(),
68            state,
69        }
70    }
71
72    pub(super) fn service_manager<'a>(&self, cfg: &'a ProxyConfig) -> &'a ServiceConfigManager {
73        match self.service_name {
74            "codex" => &cfg.codex,
75            "claude" => &cfg.claude,
76            _ => &cfg.codex,
77        }
78    }
79
80    pub(super) async fn ensure_default_session_binding(
81        &self,
82        mgr: &ServiceConfigManager,
83        session_id: &str,
84        now_ms: u64,
85    ) -> Option<SessionBinding> {
86        if let Some(binding) = self.state.get_session_binding(session_id).await {
87            self.state.touch_session_binding(session_id, now_ms).await;
88            return Some(binding);
89        }
90
91        let profile_name =
92            effective_default_profile_name(self.state.as_ref(), self.service_name, mgr).await?;
93        let profile = crate::config::resolve_service_profile(mgr, profile_name.as_str()).ok()?;
94        let binding = SessionBinding {
95            session_id: session_id.to_string(),
96            profile_name: Some(profile_name),
97            station_name: profile.station.clone(),
98            model: profile.model.clone(),
99            reasoning_effort: profile.reasoning_effort.clone(),
100            service_tier: profile.service_tier.clone(),
101            continuity_mode: SessionContinuityMode::DefaultProfile,
102            created_at_ms: now_ms,
103            updated_at_ms: now_ms,
104            last_seen_ms: now_ms,
105        };
106        self.state.set_session_binding(binding.clone()).await;
107        Some(binding)
108    }
109
110    pub fn state_handle(&self) -> Arc<ProxyState> {
111        self.state.clone()
112    }
113
114    pub fn spawn_initial_balance_refresh(&self) {
115        if cfg!(test) {
116            return;
117        }
118
119        let proxy = self.clone();
120        tokio::spawn(async move {
121            match super::providers_api::refresh_provider_balances_for_proxy(&proxy, None, None)
122                .await
123            {
124                Ok(summary) => {
125                    tracing::info!(
126                        "initial provider balance refresh finished: attempted={}, refreshed={}, failed={}, missing_token={}, auto_refreshed={}",
127                        summary.attempted,
128                        summary.refreshed,
129                        summary.failed,
130                        summary.missing_token,
131                        summary.auto_refreshed
132                    );
133                }
134                Err((status, message)) => {
135                    tracing::warn!(
136                        "initial provider balance refresh failed before polling: status={}, {}",
137                        status,
138                        message
139                    );
140                }
141            }
142        });
143    }
144
145    pub async fn refresh_provider_balances(
146        &self,
147        station_name_filter: Option<&str>,
148        provider_id_filter: Option<&str>,
149    ) -> Result<ProviderBalanceRefreshResponse, ProxyControlError> {
150        let refresh = super::providers_api::refresh_provider_balances_for_proxy(
151            self,
152            station_name_filter,
153            provider_id_filter,
154        )
155        .await
156        .map_err(ProxyControlError::from)?;
157        let provider_balances = self
158            .state
159            .get_provider_balance_view(self.service_name)
160            .await;
161
162        Ok(ProviderBalanceRefreshResponse {
163            service_name: self.service_name.to_string(),
164            refresh,
165            provider_balances,
166        })
167    }
168
169    pub async fn runtime_status(&self) -> RuntimeStatusResponse {
170        super::api_responses::build_runtime_status_response(self).await
171    }
172
173    pub async fn reload_runtime_config(&self) -> Result<ReloadResult, ProxyControlError> {
174        let changed = self.config.force_reload_from_disk().await.map_err(|err| {
175            ProxyControlError::new(
176                axum::http::StatusCode::INTERNAL_SERVER_ERROR,
177                err.to_string(),
178            )
179        })?;
180        if changed {
181            super::control_plane_service::prune_runtime_observability_after_reload(self).await;
182        }
183        let status = self.runtime_status().await;
184        Ok(super::api_responses::build_reload_result(changed, status))
185    }
186
187    pub async fn profiles(&self) -> ProfilesResponse {
188        super::api_responses::make_profiles_response(self).await
189    }
190
191    pub async fn set_runtime_default_profile(
192        &self,
193        profile_name: Option<String>,
194    ) -> Result<(), ProxyControlError> {
195        let profile_name = normalize_optional_control_name(profile_name);
196
197        if let Some(profile_name) = profile_name {
198            let cfg = self.config.snapshot().await;
199            let mgr = self.service_manager(cfg.as_ref());
200            if mgr.profile(profile_name.as_str()).is_none() {
201                return Err(ProxyControlError::new(
202                    axum::http::StatusCode::NOT_FOUND,
203                    format!("profile '{}' not found", profile_name),
204                ));
205            }
206            let resolved = crate::config::resolve_service_profile(mgr, profile_name.as_str())
207                .map_err(|err| {
208                    ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
209                })?;
210            crate::config::validate_profile_station_compatibility(
211                self.service_name,
212                mgr,
213                profile_name.as_str(),
214                &resolved,
215            )
216            .map_err(|err| {
217                ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
218            })?;
219            self.state
220                .set_runtime_default_profile_override(
221                    self.service_name.to_string(),
222                    profile_name,
223                    crate::logging::now_ms(),
224                )
225                .await;
226        } else {
227            self.state
228                .clear_runtime_default_profile_override(self.service_name)
229                .await;
230        }
231
232        Ok(())
233    }
234
235    pub async fn set_persisted_default_profile(
236        &self,
237        profile_name: Option<String>,
238    ) -> Result<ProfilesResponse, ProxyControlError> {
239        let profile_name = normalize_optional_control_name(profile_name);
240
241        if let super::control_plane_service::PersistedProxySettingsDocument::V4(mut document) =
242            super::control_plane_service::load_persisted_proxy_settings_document()
243                .await
244                .map_err(ProxyControlError::from)?
245        {
246            if let Some(profile_name) = profile_name.as_deref() {
247                let view =
248                    super::control_plane_service::service_view_v4(&document, self.service_name);
249                if !view.profiles.contains_key(profile_name) {
250                    return Err(ProxyControlError::new(
251                        axum::http::StatusCode::NOT_FOUND,
252                        format!("profile '{}' not found", profile_name),
253                    ));
254                }
255                let runtime = crate::config::compile_v4_to_runtime(&document).map_err(|err| {
256                    ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
257                })?;
258                let mgr = super::persisted_registry_api::runtime_service_manager_for_document(
259                    &runtime,
260                    self.service_name,
261                );
262                let resolved =
263                    crate::config::resolve_service_profile(mgr, profile_name).map_err(|err| {
264                        ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
265                    })?;
266                crate::config::validate_profile_station_compatibility(
267                    self.service_name,
268                    mgr,
269                    profile_name,
270                    &resolved,
271                )
272                .map_err(|err| {
273                    ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
274                })?;
275            }
276            let view =
277                super::control_plane_service::service_view_v4_mut(&mut document, self.service_name);
278            view.default_profile = profile_name;
279            super::control_plane_service::save_persisted_proxy_settings_document_and_reload(
280                self,
281                super::control_plane_service::PersistedProxySettingsDocument::V4(document),
282            )
283            .await
284            .map_err(ProxyControlError::from)?;
285            return Ok(self.profiles().await);
286        }
287
288        let cfg_snapshot = self.config.snapshot().await;
289        let mut cfg = cfg_snapshot.as_ref().clone();
290        let mgr =
291            super::control_plane_service::runtime_service_manager_mut(&mut cfg, self.service_name);
292
293        if let Some(profile_name) = profile_name.as_deref() {
294            if mgr.profile(profile_name).is_none() {
295                return Err(ProxyControlError::new(
296                    axum::http::StatusCode::NOT_FOUND,
297                    format!("profile '{}' not found", profile_name),
298                ));
299            }
300            let resolved =
301                crate::config::resolve_service_profile(mgr, profile_name).map_err(|err| {
302                    ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
303                })?;
304            crate::config::validate_profile_station_compatibility(
305                self.service_name,
306                mgr,
307                profile_name,
308                &resolved,
309            )
310            .map_err(|err| {
311                ProxyControlError::new(axum::http::StatusCode::BAD_REQUEST, err.to_string())
312            })?;
313        }
314        mgr.default_profile = profile_name;
315
316        super::control_plane_service::save_runtime_profile_settings_and_reload(self, cfg)
317            .await
318            .map_err(ProxyControlError::from)
319    }
320
321    pub async fn persisted_provider_specs(
322        &self,
323    ) -> Result<PersistedProvidersCatalog, ProxyControlError> {
324        let catalog = match super::control_plane_service::load_persisted_proxy_settings_document()
325            .await
326            .map_err(ProxyControlError::from)?
327        {
328            super::control_plane_service::PersistedProxySettingsDocument::V2(cfg) => {
329                crate::config::build_persisted_provider_catalog(
330                    super::control_plane_service::service_view_v2(&cfg, self.service_name),
331                )
332            }
333            super::control_plane_service::PersistedProxySettingsDocument::V4(cfg) => {
334                PersistedProvidersCatalog {
335                    providers: super::control_plane_service::service_view_v4(
336                        &cfg,
337                        self.service_name,
338                    )
339                    .providers
340                    .iter()
341                    .map(|(name, provider)| {
342                        persisted_provider_spec_from_v4_for_service(name, provider)
343                    })
344                    .collect(),
345                }
346            }
347        };
348        Ok(catalog)
349    }
350
351    pub async fn upsert_persisted_provider_spec(
352        &self,
353        provider_name: String,
354        provider: PersistedProviderSpec,
355    ) -> Result<(), ProxyControlError> {
356        super::persisted_registry_api::upsert_persisted_provider_spec_for_proxy(
357            self,
358            provider_name,
359            provider.into(),
360        )
361        .await
362        .map(|_| ())
363        .map_err(ProxyControlError::from)
364    }
365
366    pub async fn persisted_routing_spec(&self) -> Result<PersistedRoutingSpec, ProxyControlError> {
367        match super::control_plane_service::load_persisted_proxy_settings_document()
368            .await
369            .map_err(ProxyControlError::from)?
370        {
371            super::control_plane_service::PersistedProxySettingsDocument::V4(cfg) => {
372                let view = super::control_plane_service::service_view_v4(&cfg, self.service_name);
373                Ok(persisted_routing_spec_from_v4_for_service(view))
374            }
375            super::control_plane_service::PersistedProxySettingsDocument::V2(_) => {
376                Err(ProxyControlError::new(
377                    axum::http::StatusCode::BAD_REQUEST,
378                    "routing API requires a version = 5 route graph config",
379                ))
380            }
381        }
382    }
383
384    pub async fn upsert_persisted_routing_spec(
385        &self,
386        payload: PersistedRoutingUpsertRequest,
387    ) -> Result<PersistedRoutingSpec, ProxyControlError> {
388        super::persisted_registry_api::upsert_persisted_routing_spec_for_proxy(self, payload)
389            .await
390            .map_err(ProxyControlError::from)
391    }
392
393    pub async fn routing_explain(
394        &self,
395        request: RouteRequestContext,
396        session_id: Option<String>,
397    ) -> Result<RoutingExplainResponse, ProxyControlError> {
398        super::runtime_admin_api::routing_explain_for_proxy(self, request, session_id)
399            .await
400            .map_err(ProxyControlError::from)
401    }
402}
403
404fn normalize_optional_control_name(value: Option<String>) -> Option<String> {
405    value
406        .as_deref()
407        .map(str::trim)
408        .filter(|value| !value.is_empty())
409        .map(ToOwned::to_owned)
410}
411
412fn persisted_provider_spec_from_v4_for_service(
413    name: &str,
414    provider: &crate::config::ProviderConfigV4,
415) -> PersistedProviderSpec {
416    let mut endpoints = Vec::new();
417    if let Some(base_url) = provider
418        .base_url
419        .as_deref()
420        .map(str::trim)
421        .filter(|value| !value.is_empty())
422    {
423        endpoints.push(crate::config::PersistedProviderEndpointSpec {
424            name: "default".to_string(),
425            base_url: base_url.to_string(),
426            enabled: provider.enabled,
427            priority: 0,
428            tags: std::collections::BTreeMap::new(),
429        });
430    }
431    endpoints.extend(provider.endpoints.iter().map(|(endpoint_name, endpoint)| {
432        crate::config::PersistedProviderEndpointSpec {
433            name: endpoint_name.clone(),
434            base_url: endpoint.base_url.clone(),
435            enabled: endpoint.enabled,
436            priority: endpoint.priority,
437            tags: endpoint.tags.clone(),
438        }
439    }));
440
441    PersistedProviderSpec {
442        name: name.to_string(),
443        alias: provider.alias.clone(),
444        enabled: provider.enabled,
445        auth_token_env: provider.inline_auth.auth_token_env.clone(),
446        api_key_env: provider.inline_auth.api_key_env.clone(),
447        tags: provider.tags.clone(),
448        endpoints,
449    }
450}
451
452fn persisted_routing_spec_from_v4_for_service(
453    view: &crate::config::ServiceViewV4,
454) -> PersistedRoutingSpec {
455    let routing = crate::config::effective_v4_routing(view);
456    let order = crate::config::resolved_v4_provider_order("persisted-routing", view)
457        .unwrap_or_else(|_| view.providers.keys().cloned().collect::<Vec<_>>());
458    let entry_node = routing.entry_node();
459    PersistedRoutingSpec {
460        entry: routing.entry.clone(),
461        affinity_policy: routing.affinity_policy,
462        fallback_ttl_ms: routing.fallback_ttl_ms,
463        reprobe_preferred_after_ms: routing.reprobe_preferred_after_ms,
464        routes: routing.routes.clone(),
465        policy: entry_node
466            .map(|node| node.strategy)
467            .unwrap_or(crate::config::RoutingPolicyV4::OrderedFailover),
468        order: order.clone(),
469        target: entry_node.and_then(|node| node.target.clone()),
470        prefer_tags: entry_node
471            .map(|node| node.prefer_tags.clone())
472            .unwrap_or_default(),
473        on_exhausted: entry_node
474            .map(|node| node.on_exhausted)
475            .unwrap_or(crate::config::RoutingExhaustedActionV4::Continue),
476        entry_strategy: entry_node
477            .map(|node| node.strategy)
478            .unwrap_or(crate::config::RoutingPolicyV4::OrderedFailover),
479        expanded_order: order,
480        entry_target: entry_node.and_then(|node| node.target.clone()),
481        providers: view
482            .providers
483            .iter()
484            .map(
485                |(name, provider)| crate::config::PersistedRoutingProviderRef {
486                    name: name.clone(),
487                    alias: provider.alias.clone(),
488                    enabled: provider.enabled,
489                    tags: provider.tags.clone(),
490                },
491            )
492            .collect(),
493    }
494}