Skip to main content

codex_helper_core/
state.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::io::{Read, Seek, SeekFrom};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::sync::OnceLock;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use serde_json::Value as JsonValue;
10use tokio::sync::RwLock;
11use tokio::time::{Duration, interval};
12
13pub use crate::balance::{
14    BalanceSnapshotStatus, ProviderBalanceSnapshot, StationRoutingBalanceSummary,
15};
16use crate::config::ServiceConfigManager;
17use crate::lb::{COOLDOWN_SECS, CooldownBackoff, FAILURE_THRESHOLD, LbState};
18#[cfg(test)]
19use crate::pricing::CostBreakdown;
20use crate::pricing::{CostAdjustments, estimate_request_cost_from_operator_catalog_for_service};
21use crate::routing_ir::{RoutePlanRuntimeState, RoutePlanUpstreamRuntimeState};
22use crate::runtime_identity::ProviderEndpointKey;
23use crate::sessions;
24use crate::usage::UsageMetrics;
25
26mod runtime_types;
27mod session_identity;
28
29use self::runtime_types::{
30    ConfigMetaOverride, RuntimeDefaultProfileOverride, UsageRollup, merge_station_health,
31};
32pub use self::runtime_types::{
33    HealthCheckStatus, LbConfigView, LbUpstreamView, PassiveHealthState, PassiveUpstreamHealth,
34    RuntimeConfigState, StationHealth, UpstreamHealth, UsageBucket, UsageRollupCoverage,
35    UsageRollupView,
36};
37pub use self::session_identity::{
38    ActiveRequest, FinishRequestParams, FinishedRequest, RequestObservability, ResolvedRouteValue,
39    RouteDecisionProvenance, RouteValueSource, SessionBinding, SessionContinuityMode,
40    SessionIdentityCard, SessionIdentityCardBuildInputs, SessionManualOverrides,
41    SessionObservationScope, SessionRouteAffinity, SessionRouteAffinityTarget, SessionStats,
42    build_session_identity_cards_from_parts, enrich_session_identity_cards_with_host_transcripts,
43    enrich_session_identity_cards_with_runtime,
44};
45use self::session_identity::{
46    SessionBindingEntry, SessionCwdCacheEntry, SessionEffortOverride, SessionModelOverride,
47    SessionRouteTargetOverride, SessionServiceTierOverride, SessionStationOverride,
48};
49
50type PassiveStationHealthMap =
51    HashMap<String, HashMap<String, HashMap<String, PassiveUpstreamHealth>>>;
52type ProviderBalanceMap =
53    HashMap<String, HashMap<String, HashMap<usize, HashMap<String, ProviderBalanceSnapshot>>>>;
54type ProviderBalanceSummaryMap = HashMap<String, HashMap<String, StationRoutingBalanceSummary>>;
55type ServiceLayoutSignature = Vec<(String, Vec<String>)>;
56
57#[derive(Debug, Clone, Default)]
58struct ProviderEndpointRuntimeHealth {
59    failure_count: u32,
60    cooldown_until: Option<std::time::Instant>,
61    usage_exhausted: bool,
62    penalty_streak: u32,
63    last_good_at_ms: Option<u64>,
64}
65
66#[derive(Debug, Clone)]
67struct SessionTranscriptPathCacheEntry {
68    path: Option<String>,
69    last_checked_ms: u64,
70    last_seen_ms: u64,
71}
72
73#[derive(Debug, Clone, Copy)]
74struct RuntimePolicy {
75    session_override_ttl_ms: u64,
76    session_binding_ttl_ms: u64,
77    session_binding_max_entries: usize,
78    session_route_affinity_ttl_ms: u64,
79    session_route_affinity_max_entries: usize,
80    session_cwd_cache_ttl_ms: u64,
81    session_cwd_cache_max_entries: usize,
82    session_transcript_path_cache_ttl_ms: u64,
83    session_transcript_path_cache_max_entries: usize,
84}
85
86pub struct PassiveUpstreamFailureRecord {
87    pub service_name: String,
88    pub station_name: String,
89    pub base_url: String,
90    pub status_code: Option<u16>,
91    pub error_class: Option<String>,
92    pub error: Option<String>,
93    pub now_ms: u64,
94}
95
96fn recent_finished_max() -> usize {
97    static MAX: OnceLock<usize> = OnceLock::new();
98    *MAX.get_or_init(|| {
99        std::env::var("CODEX_HELPER_RECENT_FINISHED_MAX")
100            .ok()
101            .and_then(|s| s.trim().parse::<usize>().ok())
102            .filter(|&n| n > 0)
103            .unwrap_or(2_000)
104            .clamp(200, 20_000)
105    })
106}
107
108fn unix_now_ms() -> u64 {
109    std::time::SystemTime::now()
110        .duration_since(std::time::UNIX_EPOCH)
111        .map(|d| d.as_millis() as u64)
112        .unwrap_or(0)
113}
114
115fn prune_lru_cache<T>(
116    cache: &mut HashMap<String, T>,
117    max_entries: usize,
118    last_seen: impl Fn(&T) -> u64,
119) {
120    if max_entries == 0 || cache.len() <= max_entries {
121        return;
122    }
123
124    let mut keys = cache
125        .iter()
126        .map(|(key, value)| (key.clone(), last_seen(value)))
127        .collect::<Vec<_>>();
128    keys.sort_by_key(|(_, seen)| *seen);
129    let remove_count = keys.len().saturating_sub(max_entries);
130    for (key, _) in keys.into_iter().take(remove_count) {
131        cache.remove(&key);
132    }
133}
134
135fn service_layout_signature(mgr: &ServiceConfigManager) -> ServiceLayoutSignature {
136    let mut entries = mgr
137        .stations()
138        .iter()
139        .map(|(station_name, service)| {
140            (
141                station_name.clone(),
142                service
143                    .upstreams
144                    .iter()
145                    .map(|upstream| upstream.base_url.clone())
146                    .collect::<Vec<_>>(),
147            )
148        })
149        .collect::<Vec<_>>();
150    entries.sort_by(|left, right| left.0.cmp(&right.0).then_with(|| left.1.cmp(&right.1)));
151    entries
152}
153
154fn changed_service_layout_stations(
155    previous: &ServiceLayoutSignature,
156    current: &ServiceLayoutSignature,
157) -> HashSet<String> {
158    let previous_by_station = previous
159        .iter()
160        .map(|(station_name, upstreams)| (station_name.as_str(), upstreams.as_slice()))
161        .collect::<HashMap<_, _>>();
162    let current_by_station = current
163        .iter()
164        .map(|(station_name, upstreams)| (station_name.as_str(), upstreams.as_slice()))
165        .collect::<HashMap<_, _>>();
166    let mut changed = HashSet::new();
167
168    for (station_name, upstreams) in previous {
169        if current_by_station
170            .get(station_name.as_str())
171            .is_none_or(|current_upstreams| *current_upstreams != upstreams.as_slice())
172        {
173            changed.insert(station_name.clone());
174        }
175    }
176
177    for (station_name, upstreams) in current {
178        if previous_by_station
179            .get(station_name.as_str())
180            .is_none_or(|previous_upstreams| *previous_upstreams != upstreams.as_slice())
181        {
182            changed.insert(station_name.clone());
183        }
184    }
185
186    changed
187}
188
189/// Runtime-only state for the proxy process.
190///
191/// This state is intentionally not persisted across restarts.
192#[derive(Debug)]
193pub struct ProxyState {
194    next_request_id: AtomicU64,
195    // Manual per-session overrides remain runtime-scoped and expire after inactivity.
196    session_override_ttl_ms: u64,
197    // Bindings are sticky by default; operators can opt into pruning with a separate TTL.
198    session_binding_ttl_ms: u64,
199    session_binding_max_entries: usize,
200    session_route_affinity_ttl_ms: u64,
201    session_route_affinity_max_entries: usize,
202    session_cwd_cache_ttl_ms: u64,
203    session_cwd_cache_max_entries: usize,
204    session_transcript_path_cache_ttl_ms: u64,
205    session_transcript_path_cache_max_entries: usize,
206    session_effort_overrides: RwLock<HashMap<String, SessionEffortOverride>>,
207    session_station_overrides: RwLock<HashMap<String, SessionStationOverride>>,
208    session_route_target_overrides: RwLock<HashMap<String, SessionRouteTargetOverride>>,
209    session_model_overrides: RwLock<HashMap<String, SessionModelOverride>>,
210    session_service_tier_overrides: RwLock<HashMap<String, SessionServiceTierOverride>>,
211    session_bindings: RwLock<HashMap<String, SessionBindingEntry>>,
212    session_route_affinities: RwLock<HashMap<String, SessionRouteAffinity>>,
213    global_station_override: RwLock<Option<String>>,
214    global_route_target_override: RwLock<Option<String>>,
215    runtime_default_profiles: RwLock<HashMap<String, RuntimeDefaultProfileOverride>>,
216    station_meta_overrides: RwLock<HashMap<String, HashMap<String, ConfigMetaOverride>>>,
217    // Primary provider-endpoint overrides keyed by stable provider identity.
218    provider_endpoint_meta_overrides:
219        RwLock<HashMap<String, HashMap<ProviderEndpointKey, ConfigMetaOverride>>>,
220    // Legacy base_url-keyed overrides kept for compatibility with station-oriented callers.
221    upstream_meta_overrides: RwLock<HashMap<String, HashMap<String, ConfigMetaOverride>>>,
222    session_cwd_cache: RwLock<HashMap<String, SessionCwdCacheEntry>>,
223    session_transcript_path_cache: RwLock<HashMap<String, SessionTranscriptPathCacheEntry>>,
224    session_stats: RwLock<HashMap<String, SessionStats>>,
225    active_requests: RwLock<HashMap<u64, ActiveRequest>>,
226    recent_finished: RwLock<VecDeque<FinishedRequest>>,
227    usage_rollups: RwLock<HashMap<String, UsageRollup>>,
228    station_health: RwLock<HashMap<String, HashMap<String, StationHealth>>>,
229    passive_station_health: RwLock<PassiveStationHealthMap>,
230    provider_balances: RwLock<ProviderBalanceMap>,
231    provider_balance_summaries: RwLock<ProviderBalanceSummaryMap>,
232    provider_endpoint_runtime_health:
233        RwLock<HashMap<String, HashMap<ProviderEndpointKey, ProviderEndpointRuntimeHealth>>>,
234    station_health_checks: RwLock<HashMap<String, HashMap<String, HealthCheckStatus>>>,
235    service_layout_signatures: RwLock<HashMap<String, ServiceLayoutSignature>>,
236    lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
237}
238
239impl ProxyState {
240    const MAX_HEALTH_RECORDS_PER_STATION: usize = 200;
241
242    #[allow(dead_code)]
243    pub fn new() -> Arc<Self> {
244        Self::new_with_lb_states(None)
245    }
246
247    pub fn new_with_lb_states(
248        lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
249    ) -> Arc<Self> {
250        let ttl_secs = std::env::var("CODEX_HELPER_SESSION_OVERRIDE_TTL_SECS")
251            .ok()
252            .and_then(|s| s.trim().parse::<u64>().ok())
253            .filter(|&n| n > 0)
254            .unwrap_or(30 * 60);
255        let ttl_ms = ttl_secs.saturating_mul(1000);
256        let binding_ttl_secs = std::env::var("CODEX_HELPER_SESSION_BINDING_TTL_SECS")
257            .ok()
258            .and_then(|s| s.trim().parse::<u64>().ok())
259            .unwrap_or(0);
260        let binding_ttl_ms = binding_ttl_secs.saturating_mul(1000);
261        let binding_max_entries = std::env::var("CODEX_HELPER_SESSION_BINDING_MAX_ENTRIES")
262            .ok()
263            .and_then(|s| s.trim().parse::<usize>().ok())
264            .unwrap_or(2_000);
265        let route_affinity_ttl_secs = std::env::var("CODEX_HELPER_SESSION_ROUTE_AFFINITY_TTL_SECS")
266            .ok()
267            .and_then(|s| s.trim().parse::<u64>().ok())
268            .unwrap_or(0);
269        let route_affinity_ttl_ms = route_affinity_ttl_secs.saturating_mul(1000);
270        let route_affinity_max_entries =
271            std::env::var("CODEX_HELPER_SESSION_ROUTE_AFFINITY_MAX_ENTRIES")
272                .ok()
273                .and_then(|s| s.trim().parse::<usize>().ok())
274                .unwrap_or(5_000);
275
276        let cwd_cache_ttl_secs = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_TTL_SECS")
277            .ok()
278            .and_then(|s| s.trim().parse::<u64>().ok())
279            .unwrap_or(12 * 60 * 60);
280        let cwd_cache_ttl_ms = cwd_cache_ttl_secs.saturating_mul(1000);
281        let cwd_cache_max_entries = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_MAX_ENTRIES")
282            .ok()
283            .and_then(|s| s.trim().parse::<usize>().ok())
284            .unwrap_or(2_000);
285        let transcript_path_cache_ttl_secs =
286            std::env::var("CODEX_HELPER_SESSION_TRANSCRIPT_PATH_CACHE_TTL_SECS")
287                .ok()
288                .and_then(|s| s.trim().parse::<u64>().ok())
289                .unwrap_or(30);
290        let transcript_path_cache_ttl_ms = transcript_path_cache_ttl_secs.saturating_mul(1000);
291        let transcript_path_cache_max_entries =
292            std::env::var("CODEX_HELPER_SESSION_TRANSCRIPT_PATH_CACHE_MAX_ENTRIES")
293                .ok()
294                .and_then(|s| s.trim().parse::<usize>().ok())
295                .unwrap_or(5_000);
296
297        Self::new_with_runtime_policy(
298            lb_states,
299            RuntimePolicy {
300                session_override_ttl_ms: ttl_ms,
301                session_binding_ttl_ms: binding_ttl_ms,
302                session_binding_max_entries: binding_max_entries,
303                session_route_affinity_ttl_ms: route_affinity_ttl_ms,
304                session_route_affinity_max_entries: route_affinity_max_entries,
305                session_cwd_cache_ttl_ms: cwd_cache_ttl_ms,
306                session_cwd_cache_max_entries: cwd_cache_max_entries,
307                session_transcript_path_cache_ttl_ms: transcript_path_cache_ttl_ms,
308                session_transcript_path_cache_max_entries: transcript_path_cache_max_entries,
309            },
310        )
311    }
312
313    fn new_with_runtime_policy(
314        lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
315        policy: RuntimePolicy,
316    ) -> Arc<Self> {
317        Arc::new(Self {
318            next_request_id: AtomicU64::new(1),
319            session_override_ttl_ms: policy.session_override_ttl_ms,
320            session_binding_ttl_ms: policy.session_binding_ttl_ms,
321            session_binding_max_entries: policy.session_binding_max_entries,
322            session_route_affinity_ttl_ms: policy.session_route_affinity_ttl_ms,
323            session_route_affinity_max_entries: policy.session_route_affinity_max_entries,
324            session_cwd_cache_ttl_ms: policy.session_cwd_cache_ttl_ms,
325            session_cwd_cache_max_entries: policy.session_cwd_cache_max_entries,
326            session_transcript_path_cache_ttl_ms: policy.session_transcript_path_cache_ttl_ms,
327            session_transcript_path_cache_max_entries: policy
328                .session_transcript_path_cache_max_entries,
329            session_effort_overrides: RwLock::new(HashMap::new()),
330            session_station_overrides: RwLock::new(HashMap::new()),
331            session_route_target_overrides: RwLock::new(HashMap::new()),
332            session_model_overrides: RwLock::new(HashMap::new()),
333            session_service_tier_overrides: RwLock::new(HashMap::new()),
334            session_bindings: RwLock::new(HashMap::new()),
335            session_route_affinities: RwLock::new(HashMap::new()),
336            global_station_override: RwLock::new(None),
337            global_route_target_override: RwLock::new(None),
338            runtime_default_profiles: RwLock::new(HashMap::new()),
339            station_meta_overrides: RwLock::new(HashMap::new()),
340            provider_endpoint_meta_overrides: RwLock::new(HashMap::new()),
341            upstream_meta_overrides: RwLock::new(HashMap::new()),
342            session_cwd_cache: RwLock::new(HashMap::new()),
343            session_transcript_path_cache: RwLock::new(HashMap::new()),
344            session_stats: RwLock::new(HashMap::new()),
345            active_requests: RwLock::new(HashMap::new()),
346            recent_finished: RwLock::new(VecDeque::new()),
347            usage_rollups: RwLock::new(HashMap::new()),
348            station_health: RwLock::new(HashMap::new()),
349            passive_station_health: RwLock::new(HashMap::new()),
350            provider_balances: RwLock::new(HashMap::new()),
351            provider_balance_summaries: RwLock::new(HashMap::new()),
352            provider_endpoint_runtime_health: RwLock::new(HashMap::new()),
353            station_health_checks: RwLock::new(HashMap::new()),
354            service_layout_signatures: RwLock::new(HashMap::new()),
355            lb_states,
356        })
357    }
358
359    pub async fn get_session_effort_override(&self, session_id: &str) -> Option<String> {
360        let guard = self.session_effort_overrides.read().await;
361        guard.get(session_id).map(|v| v.effort.clone())
362    }
363
364    pub async fn get_session_reasoning_effort_override(&self, session_id: &str) -> Option<String> {
365        self.get_session_effort_override(session_id).await
366    }
367
368    pub async fn set_session_effort_override(
369        &self,
370        session_id: String,
371        effort: String,
372        now_ms: u64,
373    ) {
374        let mut guard = self.session_effort_overrides.write().await;
375        guard.insert(
376            session_id,
377            SessionEffortOverride {
378                effort,
379                updated_at_ms: now_ms,
380                last_seen_ms: now_ms,
381            },
382        );
383    }
384
385    pub async fn set_session_reasoning_effort_override(
386        &self,
387        session_id: String,
388        reasoning_effort: String,
389        now_ms: u64,
390    ) {
391        self.set_session_effort_override(session_id, reasoning_effort, now_ms)
392            .await;
393    }
394
395    pub async fn clear_session_effort_override(&self, session_id: &str) {
396        let mut guard = self.session_effort_overrides.write().await;
397        guard.remove(session_id);
398    }
399
400    pub async fn clear_session_reasoning_effort_override(&self, session_id: &str) {
401        self.clear_session_effort_override(session_id).await;
402    }
403
404    pub async fn list_session_effort_overrides(&self) -> HashMap<String, String> {
405        let guard = self.session_effort_overrides.read().await;
406        guard
407            .iter()
408            .map(|(k, v)| (k.clone(), v.effort.clone()))
409            .collect()
410    }
411
412    pub async fn list_session_reasoning_effort_overrides(&self) -> HashMap<String, String> {
413        self.list_session_effort_overrides().await
414    }
415
416    pub async fn touch_session_override(&self, session_id: &str, now_ms: u64) {
417        let mut guard = self.session_effort_overrides.write().await;
418        if let Some(v) = guard.get_mut(session_id) {
419            v.last_seen_ms = now_ms;
420        }
421    }
422
423    pub async fn touch_session_reasoning_effort_override(&self, session_id: &str, now_ms: u64) {
424        self.touch_session_override(session_id, now_ms).await;
425    }
426
427    pub async fn get_session_station_override(&self, session_id: &str) -> Option<String> {
428        let guard = self.session_station_overrides.read().await;
429        guard.get(session_id).map(|v| v.station_name.clone())
430    }
431
432    pub async fn get_session_route_target_override(&self, session_id: &str) -> Option<String> {
433        let guard = self.session_route_target_overrides.read().await;
434        guard.get(session_id).map(|v| v.target.clone())
435    }
436
437    pub async fn get_session_model_override(&self, session_id: &str) -> Option<String> {
438        let guard = self.session_model_overrides.read().await;
439        guard.get(session_id).map(|v| v.model.clone())
440    }
441
442    pub async fn set_session_model_override(&self, session_id: String, model: String, now_ms: u64) {
443        let mut guard = self.session_model_overrides.write().await;
444        guard.insert(
445            session_id,
446            SessionModelOverride {
447                model,
448                updated_at_ms: now_ms,
449                last_seen_ms: now_ms,
450            },
451        );
452    }
453
454    pub async fn clear_session_model_override(&self, session_id: &str) {
455        let mut guard = self.session_model_overrides.write().await;
456        guard.remove(session_id);
457    }
458
459    pub async fn list_session_model_overrides(&self) -> HashMap<String, String> {
460        let guard = self.session_model_overrides.read().await;
461        guard
462            .iter()
463            .map(|(k, v)| (k.clone(), v.model.clone()))
464            .collect()
465    }
466
467    pub async fn touch_session_model_override(&self, session_id: &str, now_ms: u64) {
468        let mut guard = self.session_model_overrides.write().await;
469        if let Some(v) = guard.get_mut(session_id) {
470            v.last_seen_ms = now_ms;
471        }
472    }
473
474    pub async fn get_session_service_tier_override(&self, session_id: &str) -> Option<String> {
475        let guard = self.session_service_tier_overrides.read().await;
476        guard.get(session_id).map(|v| v.service_tier.clone())
477    }
478
479    pub async fn set_session_service_tier_override(
480        &self,
481        session_id: String,
482        service_tier: String,
483        now_ms: u64,
484    ) {
485        let mut guard = self.session_service_tier_overrides.write().await;
486        guard.insert(
487            session_id,
488            SessionServiceTierOverride {
489                service_tier,
490                updated_at_ms: now_ms,
491                last_seen_ms: now_ms,
492            },
493        );
494    }
495
496    pub async fn clear_session_service_tier_override(&self, session_id: &str) {
497        let mut guard = self.session_service_tier_overrides.write().await;
498        guard.remove(session_id);
499    }
500
501    pub async fn list_session_service_tier_overrides(&self) -> HashMap<String, String> {
502        let guard = self.session_service_tier_overrides.read().await;
503        guard
504            .iter()
505            .map(|(k, v)| (k.clone(), v.service_tier.clone()))
506            .collect()
507    }
508
509    pub async fn touch_session_service_tier_override(&self, session_id: &str, now_ms: u64) {
510        let mut guard = self.session_service_tier_overrides.write().await;
511        if let Some(v) = guard.get_mut(session_id) {
512            v.last_seen_ms = now_ms;
513        }
514    }
515
516    pub async fn get_session_binding(&self, session_id: &str) -> Option<SessionBinding> {
517        let guard = self.session_bindings.read().await;
518        guard.get(session_id).map(|entry| entry.binding.clone())
519    }
520
521    pub async fn list_session_bindings(&self) -> HashMap<String, SessionBinding> {
522        let guard = self.session_bindings.read().await;
523        guard
524            .iter()
525            .map(|(sid, entry)| (sid.clone(), entry.binding.clone()))
526            .collect()
527    }
528
529    pub async fn get_session_route_affinity(
530        &self,
531        session_id: &str,
532    ) -> Option<SessionRouteAffinity> {
533        let mut guard = self.session_route_affinities.write().await;
534        let affinity = guard.get(session_id).cloned()?;
535        if self.session_route_affinity_is_expired(&affinity, unix_now_ms()) {
536            guard.remove(session_id);
537            return None;
538        }
539        Some(affinity)
540    }
541
542    pub async fn list_session_route_affinities(&self) -> HashMap<String, SessionRouteAffinity> {
543        let mut guard = self.session_route_affinities.write().await;
544        let now_ms = unix_now_ms();
545        guard.retain(|_, affinity| !self.session_route_affinity_is_expired(affinity, now_ms));
546        guard.clone()
547    }
548
549    pub async fn record_session_route_affinity_success(
550        &self,
551        session_id: &str,
552        target: SessionRouteAffinityTarget,
553        reason_hint: Option<String>,
554        now_ms: u64,
555    ) -> SessionRouteAffinity {
556        let mut guard = self.session_route_affinities.write().await;
557        let reason = match guard.get_mut(session_id) {
558            Some(existing) if target.same_target(existing) => {
559                existing.last_selected_at_ms = now_ms;
560                return existing.clone();
561            }
562            Some(_) => reason_hint.unwrap_or_else(|| "target_changed".to_string()),
563            None => reason_hint.unwrap_or_else(|| "first_success".to_string()),
564        };
565
566        let affinity = SessionRouteAffinity {
567            route_graph_key: target.route_graph_key,
568            provider_endpoint: target.provider_endpoint,
569            upstream_base_url: target.upstream_base_url,
570            route_path: target.route_path,
571            last_selected_at_ms: now_ms,
572            last_changed_at_ms: now_ms,
573            change_reason: reason,
574        };
575        guard.insert(session_id.to_string(), affinity.clone());
576        prune_lru_cache(
577            &mut guard,
578            self.session_route_affinity_max_entries,
579            |entry| entry.last_selected_at_ms,
580        );
581        affinity
582    }
583
584    fn session_route_affinity_is_expired(
585        &self,
586        affinity: &SessionRouteAffinity,
587        now_ms: u64,
588    ) -> bool {
589        self.session_route_affinity_ttl_ms > 0
590            && now_ms.saturating_sub(affinity.last_selected_at_ms)
591                >= self.session_route_affinity_ttl_ms
592    }
593
594    pub async fn set_session_binding(&self, binding: SessionBinding) {
595        let mut guard = self.session_bindings.write().await;
596        let binding = if let Some(existing) = guard.get(binding.session_id.as_str()) {
597            SessionBinding {
598                created_at_ms: existing.binding.created_at_ms,
599                ..binding
600            }
601        } else {
602            binding
603        };
604        guard.insert(binding.session_id.clone(), SessionBindingEntry { binding });
605    }
606
607    pub async fn clear_session_binding(&self, session_id: &str) {
608        let mut guard = self.session_bindings.write().await;
609        guard.remove(session_id);
610    }
611
612    pub async fn clear_session_manual_overrides(&self, session_id: &str) {
613        self.clear_session_station_override(session_id).await;
614        self.clear_session_route_target_override(session_id).await;
615        self.clear_session_model_override(session_id).await;
616        self.clear_session_effort_override(session_id).await;
617        self.clear_session_service_tier_override(session_id).await;
618    }
619
620    pub async fn get_session_manual_overrides(&self, session_id: &str) -> SessionManualOverrides {
621        let (reasoning_effort, station_name, route_target, model, service_tier) = tokio::join!(
622            self.get_session_reasoning_effort_override(session_id),
623            self.get_session_station_override(session_id),
624            self.get_session_route_target_override(session_id),
625            self.get_session_model_override(session_id),
626            self.get_session_service_tier_override(session_id),
627        );
628
629        SessionManualOverrides {
630            reasoning_effort,
631            station_name,
632            route_target,
633            model,
634            service_tier,
635        }
636    }
637
638    pub async fn list_session_manual_overrides(&self) -> HashMap<String, SessionManualOverrides> {
639        let (reasoning_effort_map, station_map, route_target_map, model_map, service_tier_map) = tokio::join!(
640            self.list_session_reasoning_effort_overrides(),
641            self.list_session_station_overrides(),
642            self.list_session_route_target_overrides(),
643            self.list_session_model_overrides(),
644            self.list_session_service_tier_overrides(),
645        );
646
647        let mut merged = HashMap::<String, SessionManualOverrides>::new();
648        for (session_id, reasoning_effort) in reasoning_effort_map {
649            merged.entry(session_id).or_default().reasoning_effort = Some(reasoning_effort);
650        }
651        for (session_id, station_name) in station_map {
652            merged.entry(session_id).or_default().station_name = Some(station_name);
653        }
654        for (session_id, route_target) in route_target_map {
655            merged.entry(session_id).or_default().route_target = Some(route_target);
656        }
657        for (session_id, model) in model_map {
658            merged.entry(session_id).or_default().model = Some(model);
659        }
660        for (session_id, service_tier) in service_tier_map {
661            merged.entry(session_id).or_default().service_tier = Some(service_tier);
662        }
663        merged.retain(|_, overrides| !overrides.is_empty());
664        merged
665    }
666
667    pub async fn apply_session_profile_binding(
668        &self,
669        service_name: &str,
670        mgr: &ServiceConfigManager,
671        session_id: String,
672        profile_name: String,
673        now_ms: u64,
674    ) -> anyhow::Result<()> {
675        let profile = crate::config::resolve_service_profile(mgr, profile_name.as_str())?;
676        crate::config::validate_profile_station_compatibility(
677            service_name,
678            mgr,
679            profile_name.as_str(),
680            &profile,
681        )?;
682
683        self.set_session_binding(SessionBinding {
684            session_id: session_id.clone(),
685            profile_name: Some(profile_name),
686            station_name: profile.station.clone(),
687            model: profile.model.clone(),
688            reasoning_effort: profile.reasoning_effort.clone(),
689            service_tier: profile.service_tier.clone(),
690            continuity_mode: SessionContinuityMode::ManualProfile,
691            created_at_ms: now_ms,
692            updated_at_ms: now_ms,
693            last_seen_ms: now_ms,
694        })
695        .await;
696        self.clear_session_manual_overrides(session_id.as_str())
697            .await;
698        Ok(())
699    }
700
701    pub async fn touch_session_binding(&self, session_id: &str, now_ms: u64) {
702        let mut guard = self.session_bindings.write().await;
703        if let Some(entry) = guard.get_mut(session_id) {
704            entry.binding.last_seen_ms = now_ms;
705        }
706    }
707
708    pub async fn set_session_station_override(
709        &self,
710        session_id: String,
711        station_name: String,
712        now_ms: u64,
713    ) {
714        let mut guard = self.session_station_overrides.write().await;
715        guard.insert(
716            session_id,
717            SessionStationOverride {
718                station_name,
719                updated_at_ms: now_ms,
720                last_seen_ms: now_ms,
721            },
722        );
723    }
724
725    pub async fn set_session_route_target_override(
726        &self,
727        session_id: String,
728        target: String,
729        now_ms: u64,
730    ) {
731        let mut guard = self.session_route_target_overrides.write().await;
732        guard.insert(
733            session_id,
734            SessionRouteTargetOverride {
735                target,
736                updated_at_ms: now_ms,
737                last_seen_ms: now_ms,
738            },
739        );
740    }
741
742    pub async fn clear_session_station_override(&self, session_id: &str) {
743        let mut guard = self.session_station_overrides.write().await;
744        guard.remove(session_id);
745    }
746
747    pub async fn clear_session_route_target_override(&self, session_id: &str) {
748        let mut guard = self.session_route_target_overrides.write().await;
749        guard.remove(session_id);
750    }
751
752    pub async fn list_session_station_overrides(&self) -> HashMap<String, String> {
753        let guard = self.session_station_overrides.read().await;
754        guard
755            .iter()
756            .map(|(k, v)| (k.clone(), v.station_name.clone()))
757            .collect()
758    }
759
760    pub async fn list_session_route_target_overrides(&self) -> HashMap<String, String> {
761        let guard = self.session_route_target_overrides.read().await;
762        guard
763            .iter()
764            .map(|(k, v)| (k.clone(), v.target.clone()))
765            .collect()
766    }
767
768    pub async fn touch_session_station_override(&self, session_id: &str, now_ms: u64) {
769        let mut guard = self.session_station_overrides.write().await;
770        if let Some(v) = guard.get_mut(session_id) {
771            v.last_seen_ms = now_ms;
772        }
773    }
774
775    pub async fn touch_session_route_target_override(&self, session_id: &str, now_ms: u64) {
776        let mut guard = self.session_route_target_overrides.write().await;
777        if let Some(v) = guard.get_mut(session_id) {
778            v.last_seen_ms = now_ms;
779        }
780    }
781
782    pub async fn get_global_station_override(&self) -> Option<String> {
783        let guard = self.global_station_override.read().await;
784        guard.clone()
785    }
786
787    pub async fn get_global_route_target_override(&self) -> Option<String> {
788        let guard = self.global_route_target_override.read().await;
789        guard.clone()
790    }
791
792    pub async fn set_global_station_override(&self, station_name: String, _now_ms: u64) {
793        let mut guard = self.global_station_override.write().await;
794        *guard = Some(station_name);
795    }
796
797    pub async fn set_global_route_target_override(&self, target: String, _now_ms: u64) {
798        let mut guard = self.global_route_target_override.write().await;
799        *guard = Some(target);
800    }
801
802    pub async fn clear_global_station_override(&self) {
803        let mut guard = self.global_station_override.write().await;
804        *guard = None;
805    }
806
807    pub async fn clear_global_route_target_override(&self) {
808        let mut guard = self.global_route_target_override.write().await;
809        *guard = None;
810    }
811
812    pub async fn get_runtime_default_profile_override(&self, service_name: &str) -> Option<String> {
813        let guard = self.runtime_default_profiles.read().await;
814        guard
815            .get(service_name)
816            .map(|entry| entry.profile_name.clone())
817    }
818
819    pub async fn set_runtime_default_profile_override(
820        &self,
821        service_name: String,
822        profile_name: String,
823        now_ms: u64,
824    ) {
825        let mut guard = self.runtime_default_profiles.write().await;
826        guard.insert(
827            service_name,
828            RuntimeDefaultProfileOverride {
829                profile_name,
830                updated_at_ms: now_ms,
831            },
832        );
833    }
834
835    pub async fn clear_runtime_default_profile_override(&self, service_name: &str) {
836        let mut guard = self.runtime_default_profiles.write().await;
837        guard.remove(service_name);
838    }
839
840    pub async fn set_station_enabled_override(
841        &self,
842        service_name: &str,
843        station_name: String,
844        enabled: bool,
845        now_ms: u64,
846    ) {
847        let mut guard = self.station_meta_overrides.write().await;
848        let per_service = guard.entry(service_name.to_string()).or_default();
849        let entry = per_service.entry(station_name).or_default();
850        entry.enabled = Some(enabled);
851        entry.updated_at_ms = now_ms;
852    }
853
854    pub async fn set_station_level_override(
855        &self,
856        service_name: &str,
857        station_name: String,
858        level: u8,
859        now_ms: u64,
860    ) {
861        let mut guard = self.station_meta_overrides.write().await;
862        let per_service = guard.entry(service_name.to_string()).or_default();
863        let entry = per_service.entry(station_name).or_default();
864        entry.level = Some(level.clamp(1, 10));
865        entry.updated_at_ms = now_ms;
866    }
867
868    pub async fn set_station_runtime_state_override(
869        &self,
870        service_name: &str,
871        station_name: String,
872        state: RuntimeConfigState,
873        now_ms: u64,
874    ) {
875        let mut guard = self.station_meta_overrides.write().await;
876        let per_service = guard.entry(service_name.to_string()).or_default();
877        let entry = per_service.entry(station_name).or_default();
878        entry.state = Some(state);
879        entry.updated_at_ms = now_ms;
880    }
881
882    pub async fn clear_station_enabled_override(&self, service_name: &str, station_name: &str) {
883        let mut guard = self.station_meta_overrides.write().await;
884        let Some(per_service) = guard.get_mut(service_name) else {
885            return;
886        };
887        let Some(entry) = per_service.get_mut(station_name) else {
888            return;
889        };
890        entry.enabled = None;
891        if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
892            per_service.remove(station_name);
893        }
894        if per_service.is_empty() {
895            guard.remove(service_name);
896        }
897    }
898
899    pub async fn clear_station_level_override(&self, service_name: &str, station_name: &str) {
900        let mut guard = self.station_meta_overrides.write().await;
901        let Some(per_service) = guard.get_mut(service_name) else {
902            return;
903        };
904        let Some(entry) = per_service.get_mut(station_name) else {
905            return;
906        };
907        entry.level = None;
908        if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
909            per_service.remove(station_name);
910        }
911        if per_service.is_empty() {
912            guard.remove(service_name);
913        }
914    }
915
916    pub async fn clear_station_runtime_state_override(
917        &self,
918        service_name: &str,
919        station_name: &str,
920    ) {
921        let mut guard = self.station_meta_overrides.write().await;
922        let Some(per_service) = guard.get_mut(service_name) else {
923            return;
924        };
925        let Some(entry) = per_service.get_mut(station_name) else {
926            return;
927        };
928        entry.state = None;
929        if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
930            per_service.remove(station_name);
931        }
932        if per_service.is_empty() {
933            guard.remove(service_name);
934        }
935    }
936
937    pub async fn get_station_meta_overrides(
938        &self,
939        service_name: &str,
940    ) -> HashMap<String, (Option<bool>, Option<u8>)> {
941        let guard = self.station_meta_overrides.read().await;
942        guard
943            .get(service_name)
944            .map(|m| {
945                m.iter()
946                    .map(|(k, v)| (k.clone(), (v.enabled, v.level)))
947                    .collect::<HashMap<_, _>>()
948            })
949            .unwrap_or_default()
950    }
951
952    pub async fn get_station_runtime_state_overrides(
953        &self,
954        service_name: &str,
955    ) -> HashMap<String, RuntimeConfigState> {
956        let guard = self.station_meta_overrides.read().await;
957        guard
958            .get(service_name)
959            .map(|m| {
960                m.iter()
961                    .filter_map(|(k, v)| v.state.map(|state| (k.clone(), state)))
962                    .collect::<HashMap<_, _>>()
963            })
964            .unwrap_or_default()
965    }
966
967    pub async fn set_provider_endpoint_enabled_override(
968        &self,
969        service_name: &str,
970        endpoint_key: ProviderEndpointKey,
971        enabled: bool,
972        now_ms: u64,
973    ) {
974        let mut guard = self.provider_endpoint_meta_overrides.write().await;
975        let per_service = guard.entry(service_name.to_string()).or_default();
976        let entry = per_service.entry(endpoint_key).or_default();
977        entry.enabled = Some(enabled);
978        entry.updated_at_ms = now_ms;
979    }
980
981    pub async fn clear_provider_endpoint_enabled_override(
982        &self,
983        service_name: &str,
984        endpoint_key: &ProviderEndpointKey,
985    ) {
986        let mut guard = self.provider_endpoint_meta_overrides.write().await;
987        let Some(per_service) = guard.get_mut(service_name) else {
988            return;
989        };
990        let Some(entry) = per_service.get_mut(endpoint_key) else {
991            return;
992        };
993        entry.enabled = None;
994        if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
995            per_service.remove(endpoint_key);
996        }
997        if per_service.is_empty() {
998            guard.remove(service_name);
999        }
1000    }
1001
1002    pub async fn set_provider_endpoint_runtime_state_override(
1003        &self,
1004        service_name: &str,
1005        endpoint_key: ProviderEndpointKey,
1006        state: RuntimeConfigState,
1007        now_ms: u64,
1008    ) {
1009        let mut guard = self.provider_endpoint_meta_overrides.write().await;
1010        let per_service = guard.entry(service_name.to_string()).or_default();
1011        let entry = per_service.entry(endpoint_key).or_default();
1012        entry.state = Some(state);
1013        entry.updated_at_ms = now_ms;
1014    }
1015
1016    pub async fn clear_provider_endpoint_runtime_state_override(
1017        &self,
1018        service_name: &str,
1019        endpoint_key: &ProviderEndpointKey,
1020    ) {
1021        let mut guard = self.provider_endpoint_meta_overrides.write().await;
1022        let Some(per_service) = guard.get_mut(service_name) else {
1023            return;
1024        };
1025        let Some(entry) = per_service.get_mut(endpoint_key) else {
1026            return;
1027        };
1028        entry.state = None;
1029        if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
1030            per_service.remove(endpoint_key);
1031        }
1032        if per_service.is_empty() {
1033            guard.remove(service_name);
1034        }
1035    }
1036
1037    pub async fn set_upstream_enabled_override(
1038        &self,
1039        service_name: &str,
1040        base_url: String,
1041        enabled: bool,
1042        now_ms: u64,
1043    ) {
1044        let mut guard = self.upstream_meta_overrides.write().await;
1045        let per_service = guard.entry(service_name.to_string()).or_default();
1046        let entry = per_service.entry(base_url).or_default();
1047        entry.enabled = Some(enabled);
1048        entry.updated_at_ms = now_ms;
1049    }
1050
1051    pub async fn clear_upstream_enabled_override(&self, service_name: &str, base_url: &str) {
1052        let mut guard = self.upstream_meta_overrides.write().await;
1053        let Some(per_service) = guard.get_mut(service_name) else {
1054            return;
1055        };
1056        let Some(entry) = per_service.get_mut(base_url) else {
1057            return;
1058        };
1059        entry.enabled = None;
1060        if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
1061            per_service.remove(base_url);
1062        }
1063        if per_service.is_empty() {
1064            guard.remove(service_name);
1065        }
1066    }
1067
1068    pub async fn set_upstream_runtime_state_override(
1069        &self,
1070        service_name: &str,
1071        base_url: String,
1072        state: RuntimeConfigState,
1073        now_ms: u64,
1074    ) {
1075        let mut guard = self.upstream_meta_overrides.write().await;
1076        let per_service = guard.entry(service_name.to_string()).or_default();
1077        let entry = per_service.entry(base_url).or_default();
1078        entry.state = Some(state);
1079        entry.updated_at_ms = now_ms;
1080    }
1081
1082    pub async fn clear_upstream_runtime_state_override(&self, service_name: &str, base_url: &str) {
1083        let mut guard = self.upstream_meta_overrides.write().await;
1084        let Some(per_service) = guard.get_mut(service_name) else {
1085            return;
1086        };
1087        let Some(entry) = per_service.get_mut(base_url) else {
1088            return;
1089        };
1090        entry.state = None;
1091        if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
1092            per_service.remove(base_url);
1093        }
1094        if per_service.is_empty() {
1095            guard.remove(service_name);
1096        }
1097    }
1098
1099    pub async fn get_upstream_meta_overrides(
1100        &self,
1101        service_name: &str,
1102    ) -> HashMap<String, (Option<bool>, Option<RuntimeConfigState>)> {
1103        let mut overrides = HashMap::new();
1104
1105        {
1106            let guard = self.upstream_meta_overrides.read().await;
1107            if let Some(per_service) = guard.get(service_name) {
1108                overrides.extend(
1109                    per_service
1110                        .iter()
1111                        .map(|(k, v)| (k.clone(), (v.enabled, v.state))),
1112                );
1113            }
1114        }
1115
1116        {
1117            let guard = self.provider_endpoint_meta_overrides.read().await;
1118            if let Some(per_service) = guard.get(service_name) {
1119                overrides.extend(
1120                    per_service
1121                        .iter()
1122                        .map(|(k, v)| (k.stable_key(), (v.enabled, v.state))),
1123                );
1124            }
1125        }
1126
1127        overrides
1128    }
1129
1130    pub async fn route_plan_runtime_state_for_provider_endpoints(
1131        &self,
1132        service_name: &str,
1133    ) -> RoutePlanRuntimeState {
1134        let mut runtime = RoutePlanRuntimeState::default();
1135        let now = std::time::Instant::now();
1136
1137        {
1138            let mut guard = self.provider_endpoint_runtime_health.write().await;
1139            if let Some(per_service) = guard.get_mut(service_name) {
1140                let mut affinity: Option<(ProviderEndpointKey, u64)> = None;
1141                for (endpoint_key, health) in per_service.iter_mut() {
1142                    if health.cooldown_until.is_some_and(|until| now >= until) {
1143                        health.failure_count = 0;
1144                        health.cooldown_until = None;
1145                    }
1146                    let cooldown_active = health.cooldown_until.is_some_and(|until| now < until);
1147                    runtime.set_provider_endpoint(
1148                        endpoint_key.clone(),
1149                        RoutePlanUpstreamRuntimeState {
1150                            runtime_disabled: false,
1151                            failure_count: health.failure_count,
1152                            cooldown_active,
1153                            usage_exhausted: health.usage_exhausted,
1154                            missing_auth: false,
1155                        },
1156                    );
1157                    if let Some(last_good_at_ms) = health.last_good_at_ms
1158                        && affinity
1159                            .as_ref()
1160                            .is_none_or(|(_, current)| last_good_at_ms > *current)
1161                    {
1162                        affinity = Some((endpoint_key.clone(), last_good_at_ms));
1163                    }
1164                }
1165                if let Some((endpoint_key, _)) = affinity {
1166                    runtime.set_affinity_provider_endpoint(Some(endpoint_key));
1167                }
1168            }
1169        }
1170
1171        {
1172            let guard = self.provider_endpoint_meta_overrides.read().await;
1173            if let Some(per_service) = guard.get(service_name) {
1174                for (endpoint_key, meta) in per_service {
1175                    let mut upstream_state = runtime.provider_endpoint(endpoint_key);
1176                    if meta.enabled == Some(false)
1177                        || meta
1178                            .state
1179                            .is_some_and(|state| state != RuntimeConfigState::Normal)
1180                    {
1181                        upstream_state.runtime_disabled = true;
1182                    }
1183                    runtime.set_provider_endpoint(endpoint_key.clone(), upstream_state);
1184                }
1185            }
1186        }
1187
1188        runtime
1189    }
1190
1191    pub async fn record_provider_endpoint_attempt_success(
1192        &self,
1193        service_name: &str,
1194        endpoint_key: ProviderEndpointKey,
1195        now_ms: u64,
1196    ) {
1197        let mut guard = self.provider_endpoint_runtime_health.write().await;
1198        let entry = guard
1199            .entry(service_name.to_string())
1200            .or_default()
1201            .entry(endpoint_key)
1202            .or_default();
1203        entry.failure_count = 0;
1204        entry.cooldown_until = None;
1205        entry.penalty_streak = 0;
1206        entry.last_good_at_ms = Some(now_ms);
1207    }
1208
1209    pub async fn record_provider_endpoint_attempt_failure(
1210        &self,
1211        service_name: &str,
1212        endpoint_key: ProviderEndpointKey,
1213        failure_threshold_cooldown_secs: u64,
1214        cooldown_backoff: CooldownBackoff,
1215    ) {
1216        let mut guard = self.provider_endpoint_runtime_health.write().await;
1217        let entry = guard
1218            .entry(service_name.to_string())
1219            .or_default()
1220            .entry(endpoint_key)
1221            .or_default();
1222
1223        entry.failure_count = entry.failure_count.saturating_add(1);
1224        if entry.failure_count >= FAILURE_THRESHOLD {
1225            let base_secs = if failure_threshold_cooldown_secs == 0 {
1226                COOLDOWN_SECS
1227            } else {
1228                failure_threshold_cooldown_secs
1229            };
1230            let effective_secs =
1231                cooldown_backoff.effective_cooldown_secs(base_secs, entry.penalty_streak);
1232            let now = std::time::Instant::now();
1233            let new_until = now + std::time::Duration::from_secs(effective_secs);
1234            if entry
1235                .cooldown_until
1236                .is_none_or(|existing| new_until > existing)
1237            {
1238                entry.cooldown_until = Some(new_until);
1239            }
1240            entry.penalty_streak = entry.penalty_streak.saturating_add(1);
1241            entry.last_good_at_ms = None;
1242        }
1243    }
1244
1245    pub async fn penalize_provider_endpoint_attempt(
1246        &self,
1247        service_name: &str,
1248        endpoint_key: ProviderEndpointKey,
1249        cooldown_secs: u64,
1250        cooldown_backoff: CooldownBackoff,
1251    ) {
1252        let mut guard = self.provider_endpoint_runtime_health.write().await;
1253        let entry = guard
1254            .entry(service_name.to_string())
1255            .or_default()
1256            .entry(endpoint_key)
1257            .or_default();
1258        let effective_secs =
1259            cooldown_backoff.effective_cooldown_secs(cooldown_secs, entry.penalty_streak);
1260        entry.failure_count = FAILURE_THRESHOLD;
1261        entry.cooldown_until =
1262            Some(std::time::Instant::now() + std::time::Duration::from_secs(effective_secs));
1263        entry.penalty_streak = entry.penalty_streak.saturating_add(1);
1264        entry.last_good_at_ms = None;
1265    }
1266
1267    pub async fn set_provider_endpoint_usage_exhausted(
1268        &self,
1269        service_name: &str,
1270        endpoint_key: ProviderEndpointKey,
1271        exhausted: bool,
1272    ) {
1273        let mut guard = self.provider_endpoint_runtime_health.write().await;
1274        let entry = guard
1275            .entry(service_name.to_string())
1276            .or_default()
1277            .entry(endpoint_key)
1278            .or_default();
1279        entry.usage_exhausted = exhausted;
1280    }
1281
1282    pub async fn prune_runtime_observability_for_service(
1283        &self,
1284        service_name: &str,
1285        mgr: &ServiceConfigManager,
1286    ) {
1287        let active_stations = mgr.stations().keys().cloned().collect::<HashSet<_>>();
1288        let active_upstreams = mgr
1289            .stations()
1290            .iter()
1291            .map(|(station_name, service)| {
1292                (
1293                    station_name.clone(),
1294                    service
1295                        .upstreams
1296                        .iter()
1297                        .map(|upstream| upstream.base_url.clone())
1298                        .collect::<HashSet<_>>(),
1299                )
1300            })
1301            .collect::<HashMap<_, _>>();
1302        let active_base_urls = active_upstreams
1303            .values()
1304            .flat_map(|upstreams| upstreams.iter().cloned())
1305            .collect::<HashSet<_>>();
1306        let active_provider_endpoint_keys = mgr
1307            .stations()
1308            .iter()
1309            .flat_map(|(station_name, service)| {
1310                service.upstreams.iter().enumerate().map(|(idx, upstream)| {
1311                    Self::active_provider_endpoint_key_for_upstream(
1312                        service_name,
1313                        station_name.as_str(),
1314                        idx,
1315                        upstream,
1316                    )
1317                })
1318            })
1319            .collect::<HashSet<_>>();
1320        let mut active_provider_ids = HashSet::from(["-".to_string()]);
1321        for service in mgr.stations().values() {
1322            for upstream in &service.upstreams {
1323                if let Some(provider_id) = upstream.tags.get("provider_id") {
1324                    active_provider_ids.insert(provider_id.clone());
1325                }
1326            }
1327        }
1328
1329        let layout = service_layout_signature(mgr);
1330        let balance_prune_stations = {
1331            let mut signatures = self.service_layout_signatures.write().await;
1332            let changed = signatures.get(service_name).map_or_else(
1333                || {
1334                    if active_stations.len() == 1 && active_stations.contains("routing") {
1335                        Some(HashSet::from(["routing".to_string()]))
1336                    } else {
1337                        None
1338                    }
1339                },
1340                |previous| Some(changed_service_layout_stations(previous, &layout)),
1341            );
1342            signatures.insert(service_name.to_string(), layout);
1343            changed
1344        };
1345
1346        match balance_prune_stations {
1347            Some(changed_layout_stations) if !changed_layout_stations.is_empty() => {
1348                let mut provider_balances = self.provider_balances.write().await;
1349                if let Some(per_service) = provider_balances.get_mut(service_name) {
1350                    per_service
1351                        .retain(|station_name, _| !changed_layout_stations.contains(station_name));
1352                    if per_service.is_empty() {
1353                        provider_balances.remove(service_name);
1354                    }
1355                }
1356                let mut provider_balance_summaries = self.provider_balance_summaries.write().await;
1357                if let Some(per_service) = provider_balance_summaries.get_mut(service_name) {
1358                    per_service
1359                        .retain(|station_name, _| !changed_layout_stations.contains(station_name));
1360                    if per_service.is_empty() {
1361                        provider_balance_summaries.remove(service_name);
1362                    }
1363                }
1364            }
1365            None => {
1366                let mut provider_balances = self.provider_balances.write().await;
1367                provider_balances.remove(service_name);
1368                let mut provider_balance_summaries = self.provider_balance_summaries.write().await;
1369                provider_balance_summaries.remove(service_name);
1370            }
1371            Some(_) => {}
1372        }
1373
1374        {
1375            let mut guard = self.station_meta_overrides.write().await;
1376            if let Some(per_service) = guard.get_mut(service_name) {
1377                per_service.retain(|station_name, _| active_stations.contains(station_name));
1378                if per_service.is_empty() {
1379                    guard.remove(service_name);
1380                }
1381            }
1382        }
1383
1384        {
1385            let mut guard = self.upstream_meta_overrides.write().await;
1386            if let Some(per_service) = guard.get_mut(service_name) {
1387                per_service.retain(|base_url, _| active_base_urls.contains(base_url));
1388                if per_service.is_empty() {
1389                    guard.remove(service_name);
1390                }
1391            }
1392        }
1393
1394        {
1395            let mut guard = self.provider_endpoint_meta_overrides.write().await;
1396            if let Some(per_service) = guard.get_mut(service_name) {
1397                per_service
1398                    .retain(|endpoint_key, _| active_provider_endpoint_keys.contains(endpoint_key));
1399                if per_service.is_empty() {
1400                    guard.remove(service_name);
1401                }
1402            }
1403        }
1404
1405        {
1406            let mut guard = self.provider_endpoint_runtime_health.write().await;
1407            if let Some(per_service) = guard.get_mut(service_name) {
1408                per_service
1409                    .retain(|endpoint_key, _| active_provider_endpoint_keys.contains(endpoint_key));
1410                if per_service.is_empty() {
1411                    guard.remove(service_name);
1412                }
1413            }
1414        }
1415
1416        {
1417            let mut guard = self.station_health.write().await;
1418            if let Some(per_service) = guard.get_mut(service_name) {
1419                per_service.retain(|station_name, station_health| {
1420                    if !active_stations.contains(station_name) {
1421                        return false;
1422                    }
1423                    if let Some(allowed_upstreams) = active_upstreams.get(station_name) {
1424                        station_health
1425                            .upstreams
1426                            .retain(|upstream| allowed_upstreams.contains(&upstream.base_url));
1427                    }
1428                    !station_health.upstreams.is_empty()
1429                });
1430                if per_service.is_empty() {
1431                    guard.remove(service_name);
1432                }
1433            }
1434        }
1435
1436        {
1437            let mut guard = self.passive_station_health.write().await;
1438            if let Some(per_service) = guard.get_mut(service_name) {
1439                per_service.retain(|station_name, station_health| {
1440                    if !active_stations.contains(station_name) {
1441                        return false;
1442                    }
1443                    if let Some(allowed_upstreams) = active_upstreams.get(station_name) {
1444                        station_health.retain(|base_url, _| allowed_upstreams.contains(base_url));
1445                    }
1446                    !station_health.is_empty()
1447                });
1448                if per_service.is_empty() {
1449                    guard.remove(service_name);
1450                }
1451            }
1452        }
1453
1454        {
1455            let mut guard = self.station_health_checks.write().await;
1456            if let Some(per_service) = guard.get_mut(service_name) {
1457                per_service.retain(|station_name, _| active_stations.contains(station_name));
1458                if per_service.is_empty() {
1459                    guard.remove(service_name);
1460                }
1461            }
1462        }
1463
1464        {
1465            let mut guard = self.usage_rollups.write().await;
1466            if let Some(rollup) = guard.get_mut(service_name) {
1467                rollup
1468                    .by_config
1469                    .retain(|station_name, _| active_stations.contains(station_name));
1470                rollup.by_config_day.retain(|station_name, _day_map| {
1471                    if !active_stations.contains(station_name) {
1472                        return false;
1473                    }
1474                    true
1475                });
1476                rollup
1477                    .by_provider
1478                    .retain(|provider_id, _| active_provider_ids.contains(provider_id));
1479                rollup.by_provider_day.retain(|provider_id, _day_map| {
1480                    if !active_provider_ids.contains(provider_id) {
1481                        return false;
1482                    }
1483                    true
1484                });
1485            }
1486        }
1487    }
1488
1489    fn active_provider_endpoint_key_for_upstream(
1490        service_name: &str,
1491        station_name: &str,
1492        upstream_index: usize,
1493        upstream: &crate::config::UpstreamConfig,
1494    ) -> ProviderEndpointKey {
1495        let provider_id = upstream
1496            .tags
1497            .get("provider_id")
1498            .cloned()
1499            .unwrap_or_else(|| format!("{station_name}#{upstream_index}"));
1500        let endpoint_id = upstream
1501            .tags
1502            .get("endpoint_id")
1503            .cloned()
1504            .unwrap_or_else(|| upstream_index.to_string());
1505
1506        ProviderEndpointKey::new(service_name, provider_id, endpoint_id)
1507    }
1508
1509    pub async fn record_station_health(
1510        &self,
1511        service_name: &str,
1512        station_name: String,
1513        health: StationHealth,
1514    ) {
1515        let mut guard = self.station_health.write().await;
1516        let per_service = guard.entry(service_name.to_string()).or_default();
1517        per_service.insert(station_name, health);
1518    }
1519
1520    pub async fn get_station_health(&self, service_name: &str) -> HashMap<String, StationHealth> {
1521        let active = {
1522            let guard = self.station_health.read().await;
1523            guard.get(service_name).cloned().unwrap_or_default()
1524        };
1525        let passive = {
1526            let guard = self.passive_station_health.read().await;
1527            guard.get(service_name).cloned().unwrap_or_default()
1528        };
1529        merge_station_health(active, passive)
1530    }
1531
1532    pub async fn record_provider_balance_snapshot(
1533        &self,
1534        service_name: &str,
1535        mut snapshot: ProviderBalanceSnapshot,
1536    ) {
1537        let (Some(station_name), Some(upstream_index)) =
1538            (snapshot.station_name.clone(), snapshot.upstream_index)
1539        else {
1540            return;
1541        };
1542        let now_ms = unix_now_ms();
1543        snapshot.refresh_status(now_ms);
1544
1545        let station_summary = {
1546            let mut guard = self.provider_balances.write().await;
1547            let station_balances = guard
1548                .entry(service_name.to_string())
1549                .or_default()
1550                .entry(station_name.clone())
1551                .or_default();
1552            station_balances
1553                .entry(upstream_index)
1554                .or_default()
1555                .insert(snapshot.provider_id.clone(), snapshot);
1556            StationRoutingBalanceSummary::from_snapshot_iter_at(
1557                station_balances
1558                    .values()
1559                    .flat_map(|providers| providers.values()),
1560                now_ms,
1561            )
1562        };
1563
1564        let mut summaries = self.provider_balance_summaries.write().await;
1565        summaries
1566            .entry(service_name.to_string())
1567            .or_default()
1568            .insert(station_name, station_summary);
1569    }
1570
1571    pub async fn get_provider_balance_view(
1572        &self,
1573        service_name: &str,
1574    ) -> HashMap<String, Vec<ProviderBalanceSnapshot>> {
1575        let now_ms = unix_now_ms();
1576        let guard = self.provider_balances.read().await;
1577        let Some(per_service) = guard.get(service_name) else {
1578            return HashMap::new();
1579        };
1580
1581        per_service
1582            .iter()
1583            .map(|(station_name, upstreams)| {
1584                let mut snapshots = upstreams
1585                    .values()
1586                    .flat_map(|providers| providers.values().cloned())
1587                    .collect::<Vec<_>>();
1588                for snapshot in &mut snapshots {
1589                    snapshot.refresh_status(now_ms);
1590                }
1591                snapshots.sort_by(|a, b| {
1592                    a.upstream_index
1593                        .cmp(&b.upstream_index)
1594                        .then_with(|| a.provider_id.cmp(&b.provider_id))
1595                });
1596                (station_name.clone(), snapshots)
1597            })
1598            .collect()
1599    }
1600
1601    pub async fn get_provider_balance_summary_view(
1602        &self,
1603        service_name: &str,
1604    ) -> HashMap<String, StationRoutingBalanceSummary> {
1605        let guard = self.provider_balance_summaries.read().await;
1606        let Some(per_service) = guard.get(service_name) else {
1607            return HashMap::new();
1608        };
1609
1610        per_service.clone()
1611    }
1612
1613    pub async fn record_passive_upstream_success(
1614        &self,
1615        service_name: &str,
1616        station_name: &str,
1617        base_url: &str,
1618        status_code: Option<u16>,
1619        now_ms: u64,
1620    ) {
1621        let mut guard = self.passive_station_health.write().await;
1622        let entry = guard
1623            .entry(service_name.to_string())
1624            .or_default()
1625            .entry(station_name.to_string())
1626            .or_default()
1627            .entry(base_url.to_string())
1628            .or_default();
1629        entry.record_success(now_ms, status_code);
1630    }
1631
1632    pub async fn record_passive_upstream_failure(&self, params: PassiveUpstreamFailureRecord) {
1633        let PassiveUpstreamFailureRecord {
1634            service_name,
1635            station_name,
1636            base_url,
1637            status_code,
1638            error_class,
1639            error,
1640            now_ms,
1641        } = params;
1642
1643        let mut guard = self.passive_station_health.write().await;
1644        let entry = guard
1645            .entry(service_name)
1646            .or_default()
1647            .entry(station_name)
1648            .or_default()
1649            .entry(base_url)
1650            .or_default();
1651        entry.record_failure(now_ms, status_code, error_class, error);
1652    }
1653
1654    pub async fn get_lb_view(&self) -> HashMap<String, LbConfigView> {
1655        let Some(lb_states) = self.lb_states.as_ref() else {
1656            return HashMap::new();
1657        };
1658        let mut map = match lb_states.lock() {
1659            Ok(m) => m,
1660            Err(e) => e.into_inner(),
1661        };
1662
1663        let now = std::time::Instant::now();
1664        let mut out = HashMap::new();
1665        for (cfg_name, st) in map.iter_mut() {
1666            let len = st
1667                .failure_counts
1668                .len()
1669                .max(st.cooldown_until.len())
1670                .max(st.usage_exhausted.len());
1671            if len == 0 {
1672                continue;
1673            }
1674
1675            // 如果结构变化导致长度不一致,做一次对齐,避免 UI 读到越界/脏数据。
1676            if st.failure_counts.len() != len {
1677                st.failure_counts.resize(len, 0);
1678            }
1679            if st.cooldown_until.len() != len {
1680                st.cooldown_until.resize(len, None);
1681            }
1682            if st.usage_exhausted.len() != len {
1683                st.usage_exhausted.resize(len, false);
1684            }
1685
1686            let mut upstreams = Vec::with_capacity(len);
1687            for idx in 0..len {
1688                let failure_count = st.failure_counts.get(idx).copied().unwrap_or(0);
1689                let cooldown_remaining_secs = st
1690                    .cooldown_until
1691                    .get(idx)
1692                    .and_then(|v| *v)
1693                    .map(|until| until.saturating_duration_since(now).as_secs())
1694                    .filter(|&s| s > 0);
1695                let usage_exhausted = st.usage_exhausted.get(idx).copied().unwrap_or(false);
1696                upstreams.push(LbUpstreamView {
1697                    failure_count,
1698                    cooldown_remaining_secs,
1699                    usage_exhausted,
1700                });
1701            }
1702
1703            out.insert(
1704                cfg_name.clone(),
1705                LbConfigView {
1706                    last_good_index: st.last_good_index,
1707                    upstreams,
1708                },
1709            );
1710        }
1711        out
1712    }
1713
1714    pub async fn list_health_checks(
1715        &self,
1716        service_name: &str,
1717    ) -> HashMap<String, HealthCheckStatus> {
1718        let guard = self.station_health_checks.read().await;
1719        guard.get(service_name).cloned().unwrap_or_default()
1720    }
1721
1722    pub async fn try_begin_station_health_check(
1723        &self,
1724        service_name: &str,
1725        station_name: &str,
1726        total: usize,
1727        now_ms: u64,
1728    ) -> bool {
1729        let mut guard = self.station_health_checks.write().await;
1730        let per_service = guard.entry(service_name.to_string()).or_default();
1731        if let Some(existing) = per_service.get(station_name)
1732            && !existing.done
1733        {
1734            return false;
1735        }
1736        per_service.insert(
1737            station_name.to_string(),
1738            HealthCheckStatus {
1739                started_at_ms: now_ms,
1740                updated_at_ms: now_ms,
1741                total: total.min(u32::MAX as usize) as u32,
1742                completed: 0,
1743                ok: 0,
1744                err: 0,
1745                cancel_requested: false,
1746                canceled: false,
1747                done: false,
1748                last_error: None,
1749            },
1750        );
1751        true
1752    }
1753
1754    pub async fn request_cancel_station_health_check(
1755        &self,
1756        service_name: &str,
1757        station_name: &str,
1758        now_ms: u64,
1759    ) -> bool {
1760        let mut guard = self.station_health_checks.write().await;
1761        let Some(per_service) = guard.get_mut(service_name) else {
1762            return false;
1763        };
1764        let Some(st) = per_service.get_mut(station_name) else {
1765            return false;
1766        };
1767        if st.done {
1768            return false;
1769        }
1770        st.cancel_requested = true;
1771        st.updated_at_ms = now_ms;
1772        true
1773    }
1774
1775    pub async fn is_station_health_check_cancel_requested(
1776        &self,
1777        service_name: &str,
1778        station_name: &str,
1779    ) -> bool {
1780        let guard = self.station_health_checks.read().await;
1781        guard
1782            .get(service_name)
1783            .and_then(|m| m.get(station_name))
1784            .is_some_and(|s| s.cancel_requested && !s.done)
1785    }
1786
1787    pub async fn record_station_health_check_result(
1788        &self,
1789        service_name: &str,
1790        station_name: &str,
1791        now_ms: u64,
1792        upstream: UpstreamHealth,
1793    ) {
1794        {
1795            let mut guard = self.station_health.write().await;
1796            let per_service = guard.entry(service_name.to_string()).or_default();
1797            let entry = per_service
1798                .entry(station_name.to_string())
1799                .or_insert_with(|| StationHealth {
1800                    checked_at_ms: now_ms,
1801                    upstreams: Vec::new(),
1802                });
1803            entry.checked_at_ms = entry.checked_at_ms.max(now_ms);
1804            entry.upstreams.push(upstream.clone());
1805            if entry.upstreams.len() > Self::MAX_HEALTH_RECORDS_PER_STATION {
1806                let extra = entry
1807                    .upstreams
1808                    .len()
1809                    .saturating_sub(Self::MAX_HEALTH_RECORDS_PER_STATION);
1810                if extra > 0 {
1811                    entry.upstreams.drain(0..extra);
1812                }
1813            }
1814        }
1815
1816        let mut guard = self.station_health_checks.write().await;
1817        let per_service = guard.entry(service_name.to_string()).or_default();
1818        let st = per_service.entry(station_name.to_string()).or_default();
1819        st.updated_at_ms = now_ms;
1820        st.completed = st.completed.saturating_add(1);
1821        match upstream.ok {
1822            Some(true) => st.ok = st.ok.saturating_add(1),
1823            Some(false) => {
1824                st.err = st.err.saturating_add(1);
1825                if st.last_error.is_none() {
1826                    st.last_error = upstream.error.clone();
1827                }
1828            }
1829            None => {}
1830        }
1831    }
1832
1833    pub async fn finish_station_health_check(
1834        &self,
1835        service_name: &str,
1836        station_name: &str,
1837        now_ms: u64,
1838        canceled: bool,
1839    ) {
1840        let mut guard = self.station_health_checks.write().await;
1841        let per_service = guard.entry(service_name.to_string()).or_default();
1842        let st = per_service.entry(station_name.to_string()).or_default();
1843        st.updated_at_ms = now_ms;
1844        st.canceled = canceled;
1845        st.done = true;
1846    }
1847
1848    pub async fn get_usage_rollup_view(
1849        &self,
1850        service_name: &str,
1851        top_n: usize,
1852        days: usize,
1853    ) -> UsageRollupView {
1854        let guard = self.usage_rollups.read().await;
1855        let Some(rollup) = guard.get(service_name) else {
1856            return UsageRollupView::default();
1857        };
1858
1859        fn now_day() -> i32 {
1860            std::time::SystemTime::now()
1861                .duration_since(std::time::UNIX_EPOCH)
1862                .map(|d| (d.as_millis() / 86_400_000) as i32)
1863                .unwrap_or(0)
1864        }
1865
1866        fn sorted_day_series(map: &HashMap<i32, UsageBucket>) -> Vec<(i32, UsageBucket)> {
1867            let mut out = map.iter().map(|(k, v)| (*k, v.clone())).collect::<Vec<_>>();
1868            out.sort_by_key(|(k, _)| *k);
1869            out
1870        }
1871
1872        fn filled_day_series(
1873            map: &HashMap<i32, UsageBucket>,
1874            start_day: i32,
1875            end_day: i32,
1876        ) -> Vec<(i32, UsageBucket)> {
1877            if start_day > end_day {
1878                return Vec::new();
1879            }
1880            (start_day..=end_day)
1881                .map(|day| (day, map.get(&day).cloned().unwrap_or_default()))
1882                .collect()
1883        }
1884
1885        fn sum_series(series: &[(i32, UsageBucket)]) -> UsageBucket {
1886            let mut out = UsageBucket::default();
1887            for (_, bucket) in series {
1888                out.add_assign(bucket);
1889            }
1890            out
1891        }
1892
1893        fn aggregate_entity_window(
1894            source: &HashMap<String, HashMap<i32, UsageBucket>>,
1895            start_day: Option<i32>,
1896            end_day: Option<i32>,
1897            top_n: usize,
1898        ) -> Vec<(String, UsageBucket)> {
1899            let mut out = Vec::new();
1900            for (name, days) in source {
1901                let mut bucket = UsageBucket::default();
1902                for (day, value) in days {
1903                    let include = match (start_day, end_day) {
1904                        (Some(start), Some(end)) => *day >= start && *day <= end,
1905                        _ => true,
1906                    };
1907                    if include {
1908                        bucket.add_assign(value);
1909                    }
1910                }
1911                if bucket.requests_total > 0 {
1912                    out.push((name.clone(), bucket));
1913                }
1914            }
1915            out.sort_by(|(left_name, left), (right_name, right)| {
1916                right
1917                    .usage
1918                    .total_tokens
1919                    .cmp(&left.usage.total_tokens)
1920                    .then_with(|| right.requests_total.cmp(&left.requests_total))
1921                    .then_with(|| left_name.cmp(right_name))
1922            });
1923            out.truncate(top_n);
1924            out
1925        }
1926
1927        let all_loaded = days == 0;
1928        let loaded_first_day = rollup.by_day.keys().min().copied();
1929        let loaded_last_day = rollup.by_day.keys().max().copied();
1930        let loaded_days_with_data = rollup
1931            .by_day
1932            .values()
1933            .filter(|bucket| bucket.requests_total > 0)
1934            .count();
1935
1936        let (start_day, end_day) = if all_loaded {
1937            (loaded_first_day, loaded_last_day)
1938        } else {
1939            let end = now_day();
1940            let offset = i32::try_from(days.saturating_sub(1)).unwrap_or(i32::MAX);
1941            (Some(end.saturating_sub(offset)), Some(end))
1942        };
1943
1944        let by_day = match (all_loaded, start_day, end_day) {
1945            (true, _, _) => sorted_day_series(&rollup.by_day),
1946            (false, Some(start), Some(end)) => filled_day_series(&rollup.by_day, start, end),
1947            _ => Vec::new(),
1948        };
1949        let window = if all_loaded {
1950            rollup.loaded.clone()
1951        } else {
1952            sum_series(&by_day)
1953        };
1954
1955        let mut by_config =
1956            aggregate_entity_window(&rollup.by_config_day, start_day, end_day, top_n);
1957        if all_loaded && by_config.is_empty() {
1958            by_config = rollup
1959                .by_config
1960                .iter()
1961                .map(|(k, v)| (k.clone(), v.clone()))
1962                .collect::<Vec<_>>();
1963            by_config.sort_by(|(left_name, left), (right_name, right)| {
1964                right
1965                    .usage
1966                    .total_tokens
1967                    .cmp(&left.usage.total_tokens)
1968                    .then_with(|| right.requests_total.cmp(&left.requests_total))
1969                    .then_with(|| left_name.cmp(right_name))
1970            });
1971            by_config.truncate(top_n);
1972        }
1973
1974        let mut by_provider =
1975            aggregate_entity_window(&rollup.by_provider_day, start_day, end_day, top_n);
1976        if all_loaded && by_provider.is_empty() {
1977            by_provider = rollup
1978                .by_provider
1979                .iter()
1980                .map(|(k, v)| (k.clone(), v.clone()))
1981                .collect::<Vec<_>>();
1982            by_provider.sort_by(|(left_name, left), (right_name, right)| {
1983                right
1984                    .usage
1985                    .total_tokens
1986                    .cmp(&left.usage.total_tokens)
1987                    .then_with(|| right.requests_total.cmp(&left.requests_total))
1988                    .then_with(|| left_name.cmp(right_name))
1989            });
1990            by_provider.truncate(top_n);
1991        }
1992
1993        let mut by_config_day = HashMap::new();
1994        for (name, _) in &by_config {
1995            let series = rollup
1996                .by_config_day
1997                .get(name)
1998                .map(|m| match (all_loaded, start_day, end_day) {
1999                    (true, _, _) => sorted_day_series(m),
2000                    (false, Some(start), Some(end)) => filled_day_series(m, start, end),
2001                    _ => Vec::new(),
2002                })
2003                .unwrap_or_default();
2004            by_config_day.insert(name.clone(), series);
2005        }
2006
2007        let mut by_provider_day = HashMap::new();
2008        for (name, _) in &by_provider {
2009            let series = rollup
2010                .by_provider_day
2011                .get(name)
2012                .map(|m| match (all_loaded, start_day, end_day) {
2013                    (true, _, _) => sorted_day_series(m),
2014                    (false, Some(start), Some(end)) => filled_day_series(m, start, end),
2015                    _ => Vec::new(),
2016                })
2017                .unwrap_or_default();
2018            by_provider_day.insert(name.clone(), series);
2019        }
2020
2021        let window_days_with_data = by_day
2022            .iter()
2023            .filter(|(_, bucket)| bucket.requests_total > 0)
2024            .count();
2025        let coverage = UsageRollupCoverage {
2026            requested_days: days,
2027            all_loaded,
2028            loaded_first_day,
2029            loaded_last_day,
2030            loaded_days_with_data,
2031            loaded_requests: rollup.loaded.requests_total,
2032            window_first_day: start_day,
2033            window_last_day: end_day,
2034            window_days_with_data,
2035            window_requests: window.requests_total,
2036            window_exceeds_loaded_start: matches!(
2037                (all_loaded, start_day, loaded_first_day),
2038                (false, Some(start), Some(first)) if start < first
2039            ),
2040        };
2041
2042        UsageRollupView {
2043            loaded: rollup.loaded.clone(),
2044            window,
2045            coverage,
2046            by_day,
2047            by_config,
2048            by_config_day,
2049            by_provider,
2050            by_provider_day,
2051        }
2052    }
2053
2054    pub async fn replay_usage_from_requests_log(
2055        &self,
2056        service_name: &str,
2057        log_path: PathBuf,
2058        base_url_to_provider_id: HashMap<String, String>,
2059    ) -> usize {
2060        let enabled = std::env::var("CODEX_HELPER_USAGE_REPLAY_ON_STARTUP")
2061            .ok()
2062            .map(|v| {
2063                matches!(
2064                    v.trim().to_ascii_lowercase().as_str(),
2065                    "1" | "true" | "yes" | "y" | "on"
2066                )
2067            })
2068            .unwrap_or(true);
2069        if !enabled {
2070            return 0;
2071        }
2072
2073        let already_has_data = {
2074            let guard = self.usage_rollups.read().await;
2075            guard
2076                .get(service_name)
2077                .is_some_and(|r| r.loaded.requests_total > 0)
2078        };
2079        if already_has_data {
2080            return 0;
2081        }
2082
2083        if !log_path.exists() {
2084            return 0;
2085        }
2086
2087        let max_bytes = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_BYTES")
2088            .ok()
2089            .and_then(|s| s.trim().parse::<usize>().ok())
2090            .filter(|&n| n > 0)
2091            .unwrap_or(8 * 1024 * 1024);
2092        let max_lines = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_LINES")
2093            .ok()
2094            .and_then(|s| s.trim().parse::<usize>().ok())
2095            .filter(|&n| n > 0)
2096            .unwrap_or(20_000);
2097
2098        let mut file = match std::fs::File::open(&log_path) {
2099            Ok(f) => f,
2100            Err(_) => return 0,
2101        };
2102        let len: u64 = file.metadata().map(|m| m.len()).unwrap_or_default();
2103        let start = len.saturating_sub(max_bytes as u64);
2104        if file.seek(SeekFrom::Start(start)).is_err() {
2105            return 0;
2106        }
2107        let mut buf = Vec::new();
2108        if file.read_to_end(&mut buf).is_err() {
2109            return 0;
2110        }
2111        if start > 0 {
2112            if let Some(pos) = buf.iter().position(|b| *b == b'\n') {
2113                buf = buf[pos + 1..].to_vec();
2114            } else {
2115                return 0;
2116            }
2117        }
2118
2119        let text = match std::str::from_utf8(&buf) {
2120            Ok(s) => s,
2121            Err(_) => return 0,
2122        };
2123        let lines = text
2124            .lines()
2125            .map(|l| l.trim())
2126            .filter(|l| !l.is_empty())
2127            .collect::<Vec<_>>();
2128        let start_idx = lines.len().saturating_sub(max_lines);
2129
2130        let mut events = Vec::new();
2131        for line in &lines[start_idx..] {
2132            let Ok(v) = serde_json::from_str::<JsonValue>(line) else {
2133                continue;
2134            };
2135            let Some(svc) = v.get("service").and_then(|x| x.as_str()) else {
2136                continue;
2137            };
2138            if svc != service_name {
2139                continue;
2140            }
2141
2142            let ended_at_ms = v.get("timestamp_ms").and_then(|x| x.as_u64()).unwrap_or(0);
2143            let status_code = v.get("status_code").and_then(|x| x.as_u64()).unwrap_or(0) as u16;
2144            let duration_ms = v.get("duration_ms").and_then(|x| x.as_u64()).unwrap_or(0);
2145            let station_name = v
2146                .get("station_name")
2147                .and_then(|x| x.as_str())
2148                .unwrap_or("-")
2149                .to_string();
2150            let upstream_base_url = v
2151                .get("upstream_base_url")
2152                .and_then(|x| x.as_str())
2153                .unwrap_or("-")
2154                .to_string();
2155            let provider_id = v
2156                .get("provider_id")
2157                .and_then(|x| x.as_str())
2158                .map(|s| s.to_string())
2159                .or_else(|| base_url_to_provider_id.get(&upstream_base_url).cloned())
2160                .unwrap_or_else(|| "-".to_string());
2161            let usage = v
2162                .get("usage")
2163                .and_then(|u| serde_json::from_value::<UsageMetrics>(u.clone()).ok());
2164            let ttfb_ms = v.get("ttfb_ms").and_then(|x| x.as_u64());
2165
2166            events.push((
2167                ended_at_ms,
2168                status_code,
2169                duration_ms,
2170                station_name,
2171                provider_id,
2172                usage,
2173                ttfb_ms,
2174            ));
2175        }
2176
2177        if events.is_empty() {
2178            return 0;
2179        }
2180
2181        let mut guard = self.usage_rollups.write().await;
2182        let rollup = guard.entry(service_name.to_string()).or_default();
2183        for (ended_at_ms, status_code, duration_ms, cfg_key, provider_key, usage, ttfb_ms) in
2184            events.iter()
2185        {
2186            let day = (*ended_at_ms / 86_400_000) as i32;
2187            rollup
2188                .loaded
2189                .record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
2190            rollup.by_day.entry(day).or_default().record(
2191                *status_code,
2192                *duration_ms,
2193                usage.as_ref(),
2194                None,
2195                *ttfb_ms,
2196            );
2197            rollup.by_config.entry(cfg_key.clone()).or_default().record(
2198                *status_code,
2199                *duration_ms,
2200                usage.as_ref(),
2201                None,
2202                *ttfb_ms,
2203            );
2204            rollup
2205                .by_config_day
2206                .entry(cfg_key.clone())
2207                .or_default()
2208                .entry(day)
2209                .or_default()
2210                .record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
2211            rollup
2212                .by_provider
2213                .entry(provider_key.clone())
2214                .or_default()
2215                .record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
2216            rollup
2217                .by_provider_day
2218                .entry(provider_key.clone())
2219                .or_default()
2220                .entry(day)
2221                .or_default()
2222                .record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
2223        }
2224
2225        events.len()
2226    }
2227
2228    pub async fn resolve_session_cwd(&self, session_id: &str) -> Option<String> {
2229        if self.session_cwd_cache_max_entries == 0 {
2230            return sessions::find_codex_session_cwd_by_id(session_id)
2231                .await
2232                .ok()
2233                .flatten();
2234        }
2235
2236        let now_ms = std::time::SystemTime::now()
2237            .duration_since(std::time::UNIX_EPOCH)
2238            .map(|d| d.as_millis() as u64)
2239            .unwrap_or(0);
2240
2241        {
2242            let guard = self.session_cwd_cache.read().await;
2243            if let Some(v) = guard.get(session_id) {
2244                let out = v.cwd.clone();
2245                drop(guard);
2246                let mut guard = self.session_cwd_cache.write().await;
2247                if let Some(v) = guard.get_mut(session_id) {
2248                    v.last_seen_ms = now_ms;
2249                }
2250                return out;
2251            }
2252        }
2253
2254        // Cache miss: resolve from disk and record last_seen.
2255
2256        let resolved = sessions::find_codex_session_cwd_by_id(session_id)
2257            .await
2258            .ok()
2259            .flatten();
2260
2261        let mut guard = self.session_cwd_cache.write().await;
2262        guard.insert(
2263            session_id.to_string(),
2264            SessionCwdCacheEntry {
2265                cwd: resolved.clone(),
2266                last_seen_ms: now_ms,
2267            },
2268        );
2269        resolved
2270    }
2271
2272    #[allow(clippy::too_many_arguments)]
2273    pub async fn begin_request(
2274        &self,
2275        service: &str,
2276        method: &str,
2277        path: &str,
2278        session_id: Option<String>,
2279        client_name: Option<String>,
2280        client_addr: Option<String>,
2281        cwd: Option<String>,
2282        model: Option<String>,
2283        reasoning_effort: Option<String>,
2284        service_tier: Option<String>,
2285        started_at_ms: u64,
2286    ) -> u64 {
2287        let id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
2288        let trace_id = Some(crate::logging::request_trace_id(service, id));
2289        let req = ActiveRequest {
2290            id,
2291            trace_id,
2292            session_id,
2293            client_name,
2294            client_addr,
2295            cwd,
2296            model,
2297            reasoning_effort,
2298            service_tier,
2299            station_name: None,
2300            provider_id: None,
2301            upstream_base_url: None,
2302            route_decision: None,
2303            service: service.to_string(),
2304            method: method.to_string(),
2305            path: path.to_string(),
2306            started_at_ms,
2307        };
2308        let mut guard = self.active_requests.write().await;
2309        guard.insert(id, req);
2310        id
2311    }
2312
2313    pub async fn update_request_route(
2314        &self,
2315        request_id: u64,
2316        station_name: Option<String>,
2317        provider_id: Option<String>,
2318        upstream_base_url: String,
2319        route_decision: Option<RouteDecisionProvenance>,
2320    ) {
2321        let mut guard = self.active_requests.write().await;
2322        let Some(req) = guard.get_mut(&request_id) else {
2323            return;
2324        };
2325        req.station_name = station_name;
2326        req.provider_id = provider_id;
2327        req.upstream_base_url = Some(upstream_base_url);
2328        req.route_decision = route_decision;
2329    }
2330
2331    pub async fn finish_request(&self, params: FinishRequestParams) {
2332        let mut active = self.active_requests.write().await;
2333        let Some(req) = active.remove(&params.id) else {
2334            return;
2335        };
2336
2337        let pricing_model = req
2338            .route_decision
2339            .as_ref()
2340            .and_then(|decision| decision.effective_model.as_ref())
2341            .map(|value| value.value.as_str())
2342            .or(req.model.as_deref());
2343        let cost = estimate_request_cost_from_operator_catalog_for_service(
2344            pricing_model,
2345            params.usage.as_ref(),
2346            CostAdjustments::default(),
2347            &req.service,
2348        );
2349
2350        let mut finished = FinishedRequest {
2351            id: params.id,
2352            trace_id: req.trace_id,
2353            session_id: req.session_id,
2354            client_name: req.client_name,
2355            client_addr: req.client_addr,
2356            cwd: req.cwd,
2357            model: req.model,
2358            reasoning_effort: req.reasoning_effort,
2359            service_tier: params.observed_service_tier.or(req.service_tier),
2360            station_name: req.station_name,
2361            provider_id: req.provider_id,
2362            upstream_base_url: req.upstream_base_url,
2363            route_decision: req.route_decision,
2364            usage: params.usage.clone(),
2365            cost,
2366            retry: params.retry,
2367            observability: RequestObservability::default(),
2368            service: req.service,
2369            method: req.method,
2370            path: req.path,
2371            status_code: params.status_code,
2372            duration_ms: params.duration_ms,
2373            ttfb_ms: params.ttfb_ms,
2374            streaming: params.streaming,
2375            ended_at_ms: params.ended_at_ms,
2376        };
2377        finished.refresh_observability();
2378
2379        {
2380            let day = (finished.ended_at_ms / 86_400_000) as i32;
2381            let cfg_key = finished
2382                .station_name
2383                .clone()
2384                .unwrap_or_else(|| "-".to_string());
2385            let provider_key = finished
2386                .provider_id
2387                .clone()
2388                .unwrap_or_else(|| "-".to_string());
2389
2390            let mut rollups = self.usage_rollups.write().await;
2391            let rollup = rollups.entry(finished.service.clone()).or_default();
2392            rollup.loaded.record(
2393                finished.status_code,
2394                finished.duration_ms,
2395                finished.usage.as_ref(),
2396                Some(&finished.cost),
2397                finished.ttfb_ms,
2398            );
2399            rollup.by_day.entry(day).or_default().record(
2400                finished.status_code,
2401                finished.duration_ms,
2402                finished.usage.as_ref(),
2403                Some(&finished.cost),
2404                finished.ttfb_ms,
2405            );
2406            rollup.by_config.entry(cfg_key.clone()).or_default().record(
2407                finished.status_code,
2408                finished.duration_ms,
2409                finished.usage.as_ref(),
2410                Some(&finished.cost),
2411                finished.ttfb_ms,
2412            );
2413            rollup
2414                .by_config_day
2415                .entry(cfg_key)
2416                .or_default()
2417                .entry(day)
2418                .or_default()
2419                .record(
2420                    finished.status_code,
2421                    finished.duration_ms,
2422                    finished.usage.as_ref(),
2423                    Some(&finished.cost),
2424                    finished.ttfb_ms,
2425                );
2426
2427            rollup
2428                .by_provider
2429                .entry(provider_key.clone())
2430                .or_default()
2431                .record(
2432                    finished.status_code,
2433                    finished.duration_ms,
2434                    finished.usage.as_ref(),
2435                    Some(&finished.cost),
2436                    finished.ttfb_ms,
2437                );
2438            rollup
2439                .by_provider_day
2440                .entry(provider_key)
2441                .or_default()
2442                .entry(day)
2443                .or_default()
2444                .record(
2445                    finished.status_code,
2446                    finished.duration_ms,
2447                    finished.usage.as_ref(),
2448                    Some(&finished.cost),
2449                    finished.ttfb_ms,
2450                );
2451        }
2452
2453        if let Some(sid) = finished.session_id.as_deref() {
2454            let mut stats = self.session_stats.write().await;
2455            let entry = stats.entry(sid.to_string()).or_default();
2456            entry.turns_total = entry.turns_total.saturating_add(1);
2457            entry.last_client_name = finished
2458                .client_name
2459                .clone()
2460                .or(entry.last_client_name.clone());
2461            entry.last_client_addr = finished
2462                .client_addr
2463                .clone()
2464                .or(entry.last_client_addr.clone());
2465            entry.last_model = finished.model.clone().or(entry.last_model.clone());
2466            entry.last_reasoning_effort = finished
2467                .reasoning_effort
2468                .clone()
2469                .or(entry.last_reasoning_effort.clone());
2470            entry.last_service_tier = finished
2471                .service_tier
2472                .clone()
2473                .or(entry.last_service_tier.clone());
2474            entry.last_provider_id = finished
2475                .provider_id
2476                .clone()
2477                .or(entry.last_provider_id.clone());
2478            entry.last_station_name = finished
2479                .station_name
2480                .clone()
2481                .or(entry.last_station_name.clone());
2482            if finished.route_decision.is_some() {
2483                entry.last_route_decision = finished.route_decision.clone();
2484            }
2485            if let Some(u) = finished.usage.as_ref() {
2486                entry.last_usage = Some(u.clone());
2487                entry.total_usage.add_assign(u);
2488                entry.turns_with_usage = entry.turns_with_usage.saturating_add(1);
2489            }
2490            entry.last_status = Some(finished.status_code);
2491            entry.last_duration_ms = Some(finished.duration_ms);
2492            entry.last_ended_at_ms = Some(finished.ended_at_ms);
2493            entry.last_seen_ms = finished.ended_at_ms;
2494        }
2495
2496        let mut recent = self.recent_finished.write().await;
2497        recent.push_front(finished);
2498        while recent.len() > recent_finished_max() {
2499            recent.pop_back();
2500        }
2501    }
2502
2503    pub async fn list_active_requests(&self) -> Vec<ActiveRequest> {
2504        let guard = self.active_requests.read().await;
2505        let mut vec = guard.values().cloned().collect::<Vec<_>>();
2506        vec.sort_by_key(|r| r.started_at_ms);
2507        vec
2508    }
2509
2510    pub async fn list_recent_finished(&self, limit: usize) -> Vec<FinishedRequest> {
2511        let guard = self.recent_finished.read().await;
2512        guard.iter().take(limit).cloned().collect()
2513    }
2514
2515    pub async fn list_session_stats(&self) -> HashMap<String, SessionStats> {
2516        let guard = self.session_stats.read().await;
2517        guard.clone()
2518    }
2519
2520    pub async fn list_session_identity_cards(
2521        &self,
2522        recent_limit: usize,
2523    ) -> Vec<SessionIdentityCard> {
2524        let recent_limit = recent_limit.clamp(1, recent_finished_max());
2525        let (
2526            active,
2527            recent,
2528            overrides,
2529            station_overrides,
2530            model_overrides,
2531            service_tier_overrides,
2532            bindings,
2533            route_affinities,
2534            global_station_override,
2535            stats,
2536        ) = tokio::join!(
2537            self.list_active_requests(),
2538            self.list_recent_finished(recent_limit),
2539            self.list_session_effort_overrides(),
2540            self.list_session_station_overrides(),
2541            self.list_session_model_overrides(),
2542            self.list_session_service_tier_overrides(),
2543            self.list_session_bindings(),
2544            self.list_session_route_affinities(),
2545            self.get_global_station_override(),
2546            self.list_session_stats(),
2547        );
2548        build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
2549            active: &active,
2550            recent: &recent,
2551            overrides: &overrides,
2552            station_overrides: &station_overrides,
2553            model_overrides: &model_overrides,
2554            service_tier_overrides: &service_tier_overrides,
2555            bindings: &bindings,
2556            route_affinities: &route_affinities,
2557            global_station_override: global_station_override.as_deref(),
2558            stats: &stats,
2559        })
2560    }
2561
2562    async fn resolve_host_transcript_paths_cached(
2563        &self,
2564        session_ids: &[String],
2565    ) -> HashMap<String, Option<String>> {
2566        let mut unique = session_ids
2567            .iter()
2568            .map(|sid| sid.trim())
2569            .filter(|sid| !sid.is_empty())
2570            .map(ToOwned::to_owned)
2571            .collect::<Vec<_>>();
2572        unique.sort();
2573        unique.dedup();
2574        if unique.is_empty() {
2575            return HashMap::new();
2576        }
2577
2578        if self.session_transcript_path_cache_max_entries == 0 {
2579            return sessions::find_codex_session_files_by_ids(&unique)
2580                .await
2581                .unwrap_or_default()
2582                .into_iter()
2583                .map(|(sid, path)| (sid, Some(path.to_string_lossy().to_string())))
2584                .collect();
2585        }
2586
2587        let now_ms = unix_now_ms();
2588        let ttl_ms = self.session_transcript_path_cache_ttl_ms;
2589        let mut resolved = HashMap::<String, Option<String>>::new();
2590        let mut stale_or_missing = Vec::<String>::new();
2591
2592        {
2593            let cache = self.session_transcript_path_cache.read().await;
2594            for sid in &unique {
2595                let fresh = cache.get(sid).filter(|entry| {
2596                    ttl_ms == 0 || now_ms.saturating_sub(entry.last_checked_ms) <= ttl_ms
2597                });
2598                if let Some(entry) = fresh {
2599                    resolved.insert(sid.clone(), entry.path.clone());
2600                } else {
2601                    stale_or_missing.push(sid.clone());
2602                }
2603            }
2604        }
2605
2606        if !stale_or_missing.is_empty() {
2607            let found = sessions::find_codex_session_files_by_ids(&stale_or_missing)
2608                .await
2609                .unwrap_or_default();
2610            let mut cache = self.session_transcript_path_cache.write().await;
2611            for sid in stale_or_missing {
2612                let path = found
2613                    .get(&sid)
2614                    .map(|path| path.to_string_lossy().to_string());
2615                cache.insert(
2616                    sid.clone(),
2617                    SessionTranscriptPathCacheEntry {
2618                        path: path.clone(),
2619                        last_checked_ms: now_ms,
2620                        last_seen_ms: now_ms,
2621                    },
2622                );
2623                resolved.insert(sid, path);
2624            }
2625            prune_lru_cache(
2626                &mut cache,
2627                self.session_transcript_path_cache_max_entries,
2628                |entry| entry.last_seen_ms,
2629            );
2630        }
2631
2632        {
2633            let mut cache = self.session_transcript_path_cache.write().await;
2634            for sid in &unique {
2635                if let Some(entry) = cache.get_mut(sid) {
2636                    entry.last_seen_ms = now_ms;
2637                }
2638            }
2639        }
2640
2641        resolved
2642    }
2643
2644    pub async fn enrich_session_identity_cards_with_cached_host_transcripts(
2645        &self,
2646        cards: &mut [SessionIdentityCard],
2647    ) {
2648        let session_ids = cards
2649            .iter()
2650            .filter_map(|card| card.session_id.as_deref())
2651            .map(str::to_owned)
2652            .collect::<Vec<_>>();
2653        let resolved = self
2654            .resolve_host_transcript_paths_cached(&session_ids)
2655            .await;
2656        for card in cards {
2657            card.host_local_transcript_path = card
2658                .session_id
2659                .as_deref()
2660                .and_then(|sid| resolved.get(sid))
2661                .and_then(Clone::clone);
2662        }
2663    }
2664
2665    pub async fn list_session_identity_cards_with_host_transcripts(
2666        &self,
2667        recent_limit: usize,
2668    ) -> Vec<SessionIdentityCard> {
2669        let mut cards = self.list_session_identity_cards(recent_limit).await;
2670        self.enrich_session_identity_cards_with_cached_host_transcripts(&mut cards)
2671            .await;
2672        cards
2673    }
2674
2675    pub fn spawn_cleanup_task(state: Arc<Self>) {
2676        // Run periodically; no need to be super frequent.
2677        tokio::spawn(async move {
2678            let mut tick = interval(Duration::from_secs(30));
2679            loop {
2680                tick.tick().await;
2681                state.prune_periodic().await;
2682            }
2683        });
2684    }
2685
2686    async fn prune_periodic(&self) {
2687        let now_ms = std::time::SystemTime::now()
2688            .duration_since(std::time::UNIX_EPOCH)
2689            .map(|d| d.as_millis() as u64)
2690            .unwrap_or(0);
2691
2692        // Collect active session_ids to avoid clearing overrides for currently running requests.
2693        let active = self.active_requests.read().await;
2694        let mut active_sessions: HashMap<String, ()> = HashMap::new();
2695        for req in active.values() {
2696            if let Some(sid) = req.session_id.as_deref() {
2697                active_sessions.insert(sid.to_string(), ());
2698            }
2699        }
2700
2701        if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2702            let cutoff_override = now_ms - self.session_override_ttl_ms;
2703            let mut overrides = self.session_effort_overrides.write().await;
2704            overrides.retain(|sid, v| {
2705                if active_sessions.contains_key(sid) {
2706                    return true;
2707                }
2708                v.last_seen_ms >= cutoff_override
2709            });
2710        }
2711
2712        if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2713            let cutoff_override = now_ms - self.session_override_ttl_ms;
2714            let mut overrides = self.session_station_overrides.write().await;
2715            overrides.retain(|sid, v| {
2716                if active_sessions.contains_key(sid) {
2717                    return true;
2718                }
2719                v.last_seen_ms >= cutoff_override
2720            });
2721        }
2722
2723        if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2724            let cutoff_override = now_ms - self.session_override_ttl_ms;
2725            let mut overrides = self.session_route_target_overrides.write().await;
2726            overrides.retain(|sid, v| {
2727                if active_sessions.contains_key(sid) {
2728                    return true;
2729                }
2730                v.last_seen_ms >= cutoff_override
2731            });
2732        }
2733
2734        if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2735            let cutoff_override = now_ms - self.session_override_ttl_ms;
2736            let mut overrides = self.session_model_overrides.write().await;
2737            overrides.retain(|sid, v| {
2738                if active_sessions.contains_key(sid) {
2739                    return true;
2740                }
2741                v.last_seen_ms >= cutoff_override
2742            });
2743        }
2744
2745        if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2746            let cutoff_override = now_ms - self.session_override_ttl_ms;
2747            let mut overrides = self.session_service_tier_overrides.write().await;
2748            overrides.retain(|sid, v| {
2749                if active_sessions.contains_key(sid) {
2750                    return true;
2751                }
2752                v.last_seen_ms >= cutoff_override
2753            });
2754        }
2755
2756        if self.session_binding_ttl_ms > 0 && now_ms >= self.session_binding_ttl_ms {
2757            let cutoff_binding = now_ms - self.session_binding_ttl_ms;
2758            let mut bindings = self.session_bindings.write().await;
2759            bindings.retain(|sid, entry| {
2760                if active_sessions.contains_key(sid) {
2761                    return true;
2762                }
2763                entry.binding.last_seen_ms >= cutoff_binding
2764            });
2765        }
2766        if self.session_binding_max_entries > 0 {
2767            let mut bindings = self.session_bindings.write().await;
2768            if bindings.len() > self.session_binding_max_entries {
2769                let mut removable = bindings
2770                    .iter()
2771                    .filter(|(sid, _)| !active_sessions.contains_key(*sid))
2772                    .map(|(sid, entry)| (sid.clone(), entry.binding.last_seen_ms))
2773                    .collect::<Vec<_>>();
2774                removable.sort_by_key(|(_, last_seen_ms)| *last_seen_ms);
2775                let remove_count = bindings
2776                    .len()
2777                    .saturating_sub(self.session_binding_max_entries)
2778                    .min(removable.len());
2779                for (sid, _) in removable.into_iter().take(remove_count) {
2780                    bindings.remove(&sid);
2781                }
2782            }
2783        }
2784
2785        {
2786            let mut affinities = self.session_route_affinities.write().await;
2787            if self.session_route_affinity_ttl_ms > 0
2788                && now_ms >= self.session_route_affinity_ttl_ms
2789            {
2790                let cutoff_affinity = now_ms - self.session_route_affinity_ttl_ms;
2791                affinities.retain(|sid, affinity| {
2792                    active_sessions.contains_key(sid)
2793                        || affinity.last_selected_at_ms >= cutoff_affinity
2794                });
2795            }
2796            prune_lru_cache(
2797                &mut affinities,
2798                self.session_route_affinity_max_entries,
2799                |entry| entry.last_selected_at_ms,
2800            );
2801        }
2802
2803        // Keep a bounded number of days of rollup data to avoid unbounded growth.
2804        let keep_days: i32 = std::env::var("CODEX_HELPER_USAGE_ROLLUP_KEEP_DAYS")
2805            .ok()
2806            .and_then(|s| s.trim().parse::<i32>().ok())
2807            .filter(|&n| n > 0)
2808            .unwrap_or(60);
2809        let now_day = (now_ms / 86_400_000) as i32;
2810        let cutoff_day = now_day.saturating_sub(keep_days);
2811        let mut rollups = self.usage_rollups.write().await;
2812        for rollup in rollups.values_mut() {
2813            rollup.by_day.retain(|day, _| *day >= cutoff_day);
2814            rollup.by_config_day.retain(|_, m| {
2815                m.retain(|day, _| *day >= cutoff_day);
2816                !m.is_empty()
2817            });
2818            rollup.by_provider_day.retain(|_, m| {
2819                m.retain(|day, _| *day >= cutoff_day);
2820                !m.is_empty()
2821            });
2822        }
2823
2824        let cutoff_cwd =
2825            if self.session_cwd_cache_ttl_ms == 0 || now_ms < self.session_cwd_cache_ttl_ms {
2826                0
2827            } else {
2828                now_ms - self.session_cwd_cache_ttl_ms
2829            };
2830        self.prune_session_cwd_cache(&active_sessions, cutoff_cwd)
2831            .await;
2832        self.prune_session_transcript_path_cache(now_ms).await;
2833
2834        if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2835            let cutoff_stats = now_ms - self.session_override_ttl_ms;
2836            let mut stats = self.session_stats.write().await;
2837            stats.retain(|sid, v| {
2838                active_sessions.contains_key(sid) || v.last_seen_ms >= cutoff_stats
2839            });
2840        }
2841    }
2842
2843    async fn prune_session_cwd_cache(&self, active_sessions: &HashMap<String, ()>, cutoff: u64) {
2844        if self.session_cwd_cache_max_entries == 0 {
2845            return;
2846        }
2847        let mut cache = self.session_cwd_cache.write().await;
2848
2849        if self.session_cwd_cache_ttl_ms > 0 {
2850            cache.retain(|sid, v| {
2851                if active_sessions.contains_key(sid) {
2852                    return true;
2853                }
2854                v.last_seen_ms >= cutoff
2855            });
2856        }
2857
2858        let max = self.session_cwd_cache_max_entries;
2859        if max == 0 || cache.len() <= max {
2860            return;
2861        }
2862
2863        // Drop least-recently-seen entries first.
2864        let mut keys = cache
2865            .iter()
2866            .map(|(sid, v)| (sid.clone(), v.last_seen_ms))
2867            .collect::<Vec<_>>();
2868        keys.sort_by_key(|(_, t)| *t);
2869        let remove_count = keys.len().saturating_sub(max);
2870        for (sid, _) in keys.into_iter().take(remove_count) {
2871            cache.remove(&sid);
2872        }
2873    }
2874
2875    async fn prune_session_transcript_path_cache(&self, now_ms: u64) {
2876        let mut cache = self.session_transcript_path_cache.write().await;
2877        if self.session_transcript_path_cache_max_entries == 0 {
2878            cache.clear();
2879            return;
2880        }
2881
2882        if self.session_transcript_path_cache_ttl_ms > 0 {
2883            let cutoff = now_ms.saturating_sub(self.session_transcript_path_cache_ttl_ms);
2884            cache.retain(|_, entry| entry.last_seen_ms >= cutoff);
2885        }
2886
2887        prune_lru_cache(
2888            &mut cache,
2889            self.session_transcript_path_cache_max_entries,
2890            |entry| entry.last_seen_ms,
2891        );
2892    }
2893}
2894
2895#[cfg(test)]
2896mod tests {
2897    use super::*;
2898
2899    use crate::config::{ServiceConfig, ServiceConfigManager, UpstreamAuth, UpstreamConfig};
2900    use crate::runtime_identity::ProviderEndpointKey;
2901
2902    fn test_runtime_policy(
2903        session_override_ttl_ms: u64,
2904        session_binding_ttl_ms: u64,
2905        session_binding_max_entries: usize,
2906    ) -> RuntimePolicy {
2907        RuntimePolicy {
2908            session_override_ttl_ms,
2909            session_binding_ttl_ms,
2910            session_binding_max_entries,
2911            session_route_affinity_ttl_ms: 0,
2912            session_route_affinity_max_entries: 5_000,
2913            session_cwd_cache_ttl_ms: 0,
2914            session_cwd_cache_max_entries: 0,
2915            session_transcript_path_cache_ttl_ms: 30_000,
2916            session_transcript_path_cache_max_entries: 5_000,
2917        }
2918    }
2919
2920    #[test]
2921    fn begin_and_finish_requests_keep_trace_id() {
2922        let runtime = tokio::runtime::Runtime::new().expect("runtime");
2923        runtime.block_on(async {
2924            let state = ProxyState::new();
2925            let request_id = state
2926                .begin_request(
2927                    "codex",
2928                    "POST",
2929                    "/v1/responses",
2930                    None,
2931                    None,
2932                    None,
2933                    None,
2934                    Some("gpt-5".to_string()),
2935                    None,
2936                    Some("priority".to_string()),
2937                    100,
2938                )
2939                .await;
2940
2941            let active = state.list_active_requests().await;
2942            assert_eq!(active[0].trace_id.as_deref(), Some("codex-1"));
2943
2944            state
2945                .finish_request(FinishRequestParams {
2946                    id: request_id,
2947                    status_code: 200,
2948                    duration_ms: 10,
2949                    ended_at_ms: 110,
2950                    observed_service_tier: Some("priority".to_string()),
2951                    usage: None,
2952                    retry: None,
2953                    ttfb_ms: Some(4),
2954                    streaming: false,
2955                })
2956                .await;
2957
2958            let recent = state.list_recent_finished(1).await;
2959            assert_eq!(recent[0].trace_id.as_deref(), Some("codex-1"));
2960            assert_eq!(recent[0].observability.trace_id.as_deref(), Some("codex-1"));
2961            assert!(recent[0].observability.fast_mode);
2962            assert_eq!(recent[0].observability.generation_ms, Some(6));
2963        });
2964    }
2965
2966    #[test]
2967    fn finish_request_estimates_cost_and_rolls_up_cost() {
2968        let runtime = tokio::runtime::Runtime::new().expect("runtime");
2969        runtime.block_on(async {
2970            let state = ProxyState::new();
2971            let request_id = state
2972                .begin_request(
2973                    "codex",
2974                    "POST",
2975                    "/v1/responses",
2976                    None,
2977                    None,
2978                    None,
2979                    None,
2980                    Some("gpt-5".to_string()),
2981                    None,
2982                    None,
2983                    100,
2984                )
2985                .await;
2986
2987            state
2988                .finish_request(FinishRequestParams {
2989                    id: request_id,
2990                    status_code: 200,
2991                    duration_ms: 10,
2992                    ended_at_ms: 110,
2993                    observed_service_tier: None,
2994                    usage: Some(UsageMetrics {
2995                        input_tokens: 1_000,
2996                        output_tokens: 500,
2997                        cached_input_tokens: 100,
2998                        total_tokens: 1_500,
2999                        ..UsageMetrics::default()
3000                    }),
3001                    retry: None,
3002                    ttfb_ms: Some(4),
3003                    streaming: false,
3004                })
3005                .await;
3006
3007            let recent = state.list_recent_finished(1).await;
3008            assert_eq!(recent[0].cost.total_cost_usd.as_deref(), Some("0.0061375"));
3009
3010            let rollup = state.get_usage_rollup_view("codex", 12, 1).await;
3011            assert_eq!(
3012                rollup.loaded.cost.total_cost_usd.as_deref(),
3013                Some("0.0061375")
3014            );
3015            assert_eq!(rollup.loaded.cost.priced_requests, 1);
3016            assert_eq!(rollup.loaded.cost.unpriced_requests, 0);
3017        });
3018    }
3019
3020    #[test]
3021    fn usage_rollup_view_scores_entities_inside_selected_window() {
3022        let runtime = tokio::runtime::Runtime::new().expect("runtime");
3023        runtime.block_on(async {
3024            let state = ProxyState::new();
3025            let now_ms = std::time::SystemTime::now()
3026                .duration_since(std::time::UNIX_EPOCH)
3027                .map(|d| d.as_millis() as u64)
3028                .unwrap_or(0);
3029            let old_ms = now_ms.saturating_sub(10 * 86_400_000);
3030
3031            let old_id = state
3032                .begin_request(
3033                    "codex",
3034                    "POST",
3035                    "/v1/responses",
3036                    None,
3037                    None,
3038                    None,
3039                    None,
3040                    Some("gpt-5".to_string()),
3041                    None,
3042                    None,
3043                    old_ms.saturating_sub(1_000),
3044                )
3045                .await;
3046            state
3047                .update_request_route(
3048                    old_id,
3049                    Some("old-station".to_string()),
3050                    Some("old-provider".to_string()),
3051                    "https://old.example/v1".to_string(),
3052                    None,
3053                )
3054                .await;
3055            state
3056                .finish_request(FinishRequestParams {
3057                    id: old_id,
3058                    status_code: 200,
3059                    duration_ms: 20,
3060                    ended_at_ms: old_ms,
3061                    observed_service_tier: None,
3062                    usage: Some(UsageMetrics {
3063                        total_tokens: 100_000,
3064                        ..UsageMetrics::default()
3065                    }),
3066                    retry: None,
3067                    ttfb_ms: Some(5),
3068                    streaming: false,
3069                })
3070                .await;
3071
3072            let fresh_id = state
3073                .begin_request(
3074                    "codex",
3075                    "POST",
3076                    "/v1/responses",
3077                    None,
3078                    None,
3079                    None,
3080                    None,
3081                    Some("gpt-5".to_string()),
3082                    None,
3083                    None,
3084                    now_ms.saturating_sub(1_000),
3085                )
3086                .await;
3087            state
3088                .update_request_route(
3089                    fresh_id,
3090                    Some("fresh-station".to_string()),
3091                    Some("fresh-provider".to_string()),
3092                    "https://fresh.example/v1".to_string(),
3093                    None,
3094                )
3095                .await;
3096            state
3097                .finish_request(FinishRequestParams {
3098                    id: fresh_id,
3099                    status_code: 200,
3100                    duration_ms: 10,
3101                    ended_at_ms: now_ms,
3102                    observed_service_tier: None,
3103                    usage: Some(UsageMetrics {
3104                        total_tokens: 10,
3105                        ..UsageMetrics::default()
3106                    }),
3107                    retry: None,
3108                    ttfb_ms: Some(3),
3109                    streaming: false,
3110                })
3111                .await;
3112
3113            let week = state.get_usage_rollup_view("codex", 10, 7).await;
3114            assert_eq!(week.loaded.requests_total, 2);
3115            assert_eq!(week.window.requests_total, 1);
3116            assert_eq!(week.by_day.len(), 7);
3117            assert_eq!(week.by_config[0].0, "fresh-station");
3118            assert_eq!(week.by_provider[0].0, "fresh-provider");
3119
3120            let loaded = state.get_usage_rollup_view("codex", 10, 0).await;
3121            assert_eq!(loaded.window.requests_total, 2);
3122            assert_eq!(loaded.by_config[0].0, "old-station");
3123            assert_eq!(loaded.by_provider[0].0, "old-provider");
3124        });
3125    }
3126
3127    #[test]
3128    fn build_session_identity_cards_merges_sources_and_sorts_newest_first() {
3129        let active = vec![ActiveRequest {
3130            id: 1,
3131            trace_id: Some("codex-1".to_string()),
3132            session_id: Some("sid-active".to_string()),
3133            client_name: Some("Frank-Laptop".to_string()),
3134            client_addr: Some("100.64.0.8".to_string()),
3135            cwd: Some("G:/codes/project".to_string()),
3136            model: Some("gpt-5.4".to_string()),
3137            reasoning_effort: Some("medium".to_string()),
3138            service_tier: Some("priority".to_string()),
3139            station_name: Some("right".to_string()),
3140            provider_id: Some("right".to_string()),
3141            upstream_base_url: Some("https://right.example/v1".to_string()),
3142            route_decision: None,
3143            service: "codex".to_string(),
3144            method: "POST".to_string(),
3145            path: "/v1/responses".to_string(),
3146            started_at_ms: 500,
3147        }];
3148        let recent = vec![
3149            FinishedRequest {
3150                id: 2,
3151                trace_id: Some("codex-2".to_string()),
3152                session_id: Some("sid-recent".to_string()),
3153                client_name: Some("Studio-Mini".to_string()),
3154                client_addr: Some("100.64.0.9".to_string()),
3155                cwd: Some("G:/codes/other".to_string()),
3156                model: Some("gpt-5.3".to_string()),
3157                reasoning_effort: Some("high".to_string()),
3158                service_tier: Some("default".to_string()),
3159                station_name: Some("vibe".to_string()),
3160                provider_id: Some("vibe".to_string()),
3161                upstream_base_url: Some("https://vibe.example/v1".to_string()),
3162                route_decision: None,
3163                usage: Some(UsageMetrics {
3164                    input_tokens: 1,
3165                    output_tokens: 2,
3166                    reasoning_tokens: 3,
3167                    total_tokens: 6,
3168                    ..UsageMetrics::default()
3169                }),
3170                cost: CostBreakdown::default(),
3171                retry: None,
3172                observability: RequestObservability::default(),
3173                service: "codex".to_string(),
3174                method: "POST".to_string(),
3175                path: "/v1/responses".to_string(),
3176                status_code: 200,
3177                duration_ms: 1200,
3178                ttfb_ms: Some(100),
3179                streaming: false,
3180                ended_at_ms: 2_000,
3181            },
3182            FinishedRequest {
3183                id: 3,
3184                trace_id: Some("codex-3".to_string()),
3185                session_id: Some("sid-active".to_string()),
3186                client_name: Some("Frank-Laptop".to_string()),
3187                client_addr: Some("100.64.0.8".to_string()),
3188                cwd: Some("G:/codes/project".to_string()),
3189                model: Some("gpt-5.4".to_string()),
3190                reasoning_effort: Some("low".to_string()),
3191                service_tier: Some("flex".to_string()),
3192                station_name: Some("right".to_string()),
3193                provider_id: Some("right".to_string()),
3194                upstream_base_url: Some("https://right.example/v1".to_string()),
3195                route_decision: None,
3196                usage: None,
3197                cost: CostBreakdown::default(),
3198                retry: None,
3199                observability: RequestObservability::default(),
3200                service: "codex".to_string(),
3201                method: "POST".to_string(),
3202                path: "/v1/responses".to_string(),
3203                status_code: 429,
3204                duration_ms: 900,
3205                ttfb_ms: None,
3206                streaming: false,
3207                ended_at_ms: 1_000,
3208            },
3209        ];
3210        let overrides = HashMap::from([("sid-active".to_string(), "xhigh".to_string())]);
3211        let config_overrides = HashMap::from([("sid-active".to_string(), "temp".to_string())]);
3212        let model_overrides =
3213            HashMap::from([("sid-active".to_string(), "gpt-5.4-mini".to_string())]);
3214        let service_tier_overrides =
3215            HashMap::from([("sid-active".to_string(), "priority".to_string())]);
3216        let stats = HashMap::from([(
3217            "sid-active".to_string(),
3218            SessionStats {
3219                turns_total: 3,
3220                last_client_name: Some("Frank-Laptop".to_string()),
3221                last_client_addr: Some("100.64.0.8".to_string()),
3222                last_model: Some("gpt-5.4".to_string()),
3223                last_reasoning_effort: Some("low".to_string()),
3224                last_service_tier: Some("flex".to_string()),
3225                last_provider_id: Some("right".to_string()),
3226                last_station_name: Some("right".to_string()),
3227                last_route_decision: None,
3228                last_usage: None,
3229                total_usage: UsageMetrics {
3230                    input_tokens: 10,
3231                    output_tokens: 20,
3232                    reasoning_tokens: 5,
3233                    total_tokens: 35,
3234                    ..UsageMetrics::default()
3235                },
3236                turns_with_usage: 2,
3237                last_status: Some(429),
3238                last_duration_ms: Some(900),
3239                last_ended_at_ms: Some(1_000),
3240                last_seen_ms: 1_000,
3241            },
3242        )]);
3243
3244        let cards = build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
3245            active: &active,
3246            recent: &recent,
3247            overrides: &overrides,
3248            station_overrides: &config_overrides,
3249            model_overrides: &model_overrides,
3250            service_tier_overrides: &service_tier_overrides,
3251            bindings: &HashMap::new(),
3252            route_affinities: &HashMap::new(),
3253            global_station_override: None,
3254            stats: &stats,
3255        });
3256
3257        assert_eq!(cards.len(), 2);
3258        assert_eq!(cards[0].session_id.as_deref(), Some("sid-recent"));
3259        assert_eq!(
3260            cards[0].observation_scope,
3261            SessionObservationScope::HostLocalEnriched
3262        );
3263        assert_eq!(cards[0].last_client_name.as_deref(), Some("Studio-Mini"));
3264        assert_eq!(cards[0].last_client_addr.as_deref(), Some("100.64.0.9"));
3265        assert_eq!(cards[1].session_id.as_deref(), Some("sid-active"));
3266        assert_eq!(
3267            cards[1].observation_scope,
3268            SessionObservationScope::HostLocalEnriched
3269        );
3270        assert_eq!(cards[1].active_count, 1);
3271        assert_eq!(cards[1].last_client_name.as_deref(), Some("Frank-Laptop"));
3272        assert_eq!(cards[1].last_client_addr.as_deref(), Some("100.64.0.8"));
3273        assert_eq!(cards[1].last_status, Some(429));
3274        assert_eq!(cards[1].override_effort.as_deref(), Some("xhigh"));
3275        assert_eq!(cards[1].override_station_name.as_deref(), Some("temp"));
3276        assert_eq!(cards[1].override_model.as_deref(), Some("gpt-5.4-mini"));
3277        assert_eq!(cards[1].override_service_tier.as_deref(), Some("priority"));
3278        assert_eq!(
3279            cards[1]
3280                .effective_model
3281                .as_ref()
3282                .map(|value| value.value.as_str()),
3283            Some("gpt-5.4-mini")
3284        );
3285        assert_eq!(
3286            cards[1].effective_model.as_ref().map(|value| value.source),
3287            Some(RouteValueSource::SessionOverride)
3288        );
3289        assert_eq!(
3290            cards[1]
3291                .effective_reasoning_effort
3292                .as_ref()
3293                .map(|value| value.source),
3294            Some(RouteValueSource::SessionOverride)
3295        );
3296        assert_eq!(
3297            cards[1]
3298                .effective_service_tier
3299                .as_ref()
3300                .map(|value| value.source),
3301            Some(RouteValueSource::SessionOverride)
3302        );
3303        assert_eq!(
3304            cards[1]
3305                .effective_station
3306                .as_ref()
3307                .map(|value| value.source),
3308            Some(RouteValueSource::SessionOverride)
3309        );
3310        assert!(cards[1].effective_upstream_base_url.is_none());
3311        assert_eq!(
3312            cards[1].last_upstream_base_url.as_deref(),
3313            Some("https://right.example/v1")
3314        );
3315        assert_eq!(cards[1].turns_total, Some(3));
3316        assert_eq!(cards[1].last_service_tier.as_deref(), Some("flex"));
3317        assert_eq!(
3318            cards[1].total_usage.as_ref().map(|u| u.total_tokens),
3319            Some(35)
3320        );
3321    }
3322
3323    #[test]
3324    fn build_session_identity_cards_prefers_binding_defaults_for_effective_route() {
3325        let active = vec![ActiveRequest {
3326            id: 1,
3327            trace_id: Some("codex-1".to_string()),
3328            session_id: Some("sid-bound".to_string()),
3329            client_name: Some("Workstation".to_string()),
3330            client_addr: Some("100.64.0.10".to_string()),
3331            cwd: None,
3332            model: Some("gpt-observed".to_string()),
3333            reasoning_effort: Some("medium".to_string()),
3334            service_tier: Some("default".to_string()),
3335            station_name: Some("right".to_string()),
3336            provider_id: None,
3337            upstream_base_url: None,
3338            route_decision: None,
3339            service: "codex".to_string(),
3340            method: "POST".to_string(),
3341            path: "/v1/responses".to_string(),
3342            started_at_ms: 10,
3343        }];
3344        let bindings = HashMap::from([(
3345            "sid-bound".to_string(),
3346            SessionBinding {
3347                session_id: "sid-bound".to_string(),
3348                profile_name: Some("daily".to_string()),
3349                station_name: Some("vibe".to_string()),
3350                model: Some("gpt-bound".to_string()),
3351                reasoning_effort: Some("high".to_string()),
3352                service_tier: Some("priority".to_string()),
3353                continuity_mode: SessionContinuityMode::DefaultProfile,
3354                created_at_ms: 1,
3355                updated_at_ms: 1,
3356                last_seen_ms: 10,
3357            },
3358        )]);
3359
3360        let cards = build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
3361            active: &active,
3362            recent: &[],
3363            overrides: &HashMap::new(),
3364            station_overrides: &HashMap::new(),
3365            model_overrides: &HashMap::new(),
3366            service_tier_overrides: &HashMap::new(),
3367            bindings: &bindings,
3368            route_affinities: &HashMap::new(),
3369            global_station_override: None,
3370            stats: &HashMap::new(),
3371        });
3372
3373        assert_eq!(cards[0].binding_profile_name.as_deref(), Some("daily"));
3374        assert_eq!(
3375            cards[0].observation_scope,
3376            SessionObservationScope::ObservedOnly
3377        );
3378        assert_eq!(
3379            cards[0].binding_continuity_mode,
3380            Some(SessionContinuityMode::DefaultProfile)
3381        );
3382        assert_eq!(
3383            cards[0]
3384                .effective_model
3385                .as_ref()
3386                .map(|value| (value.value.as_str(), value.source)),
3387            Some(("gpt-bound", RouteValueSource::ProfileDefault))
3388        );
3389        assert_eq!(
3390            cards[0]
3391                .effective_reasoning_effort
3392                .as_ref()
3393                .map(|value| (value.value.as_str(), value.source)),
3394            Some(("high", RouteValueSource::ProfileDefault))
3395        );
3396        assert_eq!(
3397            cards[0]
3398                .effective_service_tier
3399                .as_ref()
3400                .map(|value| (value.value.as_str(), value.source)),
3401            Some(("priority", RouteValueSource::ProfileDefault))
3402        );
3403        assert_eq!(
3404            cards[0]
3405                .effective_station
3406                .as_ref()
3407                .map(|value| (value.value.as_str(), value.source)),
3408            Some(("vibe", RouteValueSource::ProfileDefault))
3409        );
3410    }
3411
3412    #[test]
3413    fn build_session_identity_cards_keeps_binding_values_but_allows_global_config_override() {
3414        let active = vec![ActiveRequest {
3415            id: 1,
3416            trace_id: Some("codex-1".to_string()),
3417            session_id: Some("sid-bound".to_string()),
3418            client_name: Some("Workstation".to_string()),
3419            client_addr: Some("100.64.0.10".to_string()),
3420            cwd: None,
3421            model: Some("gpt-observed".to_string()),
3422            reasoning_effort: Some("medium".to_string()),
3423            service_tier: Some("default".to_string()),
3424            station_name: Some("vibe".to_string()),
3425            provider_id: None,
3426            upstream_base_url: Some("https://vibe.example/v1".to_string()),
3427            route_decision: None,
3428            service: "codex".to_string(),
3429            method: "POST".to_string(),
3430            path: "/v1/responses".to_string(),
3431            started_at_ms: 10,
3432        }];
3433        let bindings = HashMap::from([(
3434            "sid-bound".to_string(),
3435            SessionBinding {
3436                session_id: "sid-bound".to_string(),
3437                profile_name: Some("daily".to_string()),
3438                station_name: Some("vibe".to_string()),
3439                model: Some("gpt-bound".to_string()),
3440                reasoning_effort: Some("high".to_string()),
3441                service_tier: Some("priority".to_string()),
3442                continuity_mode: SessionContinuityMode::DefaultProfile,
3443                created_at_ms: 1,
3444                updated_at_ms: 1,
3445                last_seen_ms: 10,
3446            },
3447        )]);
3448
3449        let cards = build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
3450            active: &active,
3451            recent: &[],
3452            overrides: &HashMap::new(),
3453            station_overrides: &HashMap::new(),
3454            model_overrides: &HashMap::new(),
3455            service_tier_overrides: &HashMap::new(),
3456            bindings: &bindings,
3457            route_affinities: &HashMap::new(),
3458            global_station_override: Some("right"),
3459            stats: &HashMap::new(),
3460        });
3461
3462        assert_eq!(cards[0].binding_profile_name.as_deref(), Some("daily"));
3463        assert_eq!(
3464            cards[0]
3465                .effective_model
3466                .as_ref()
3467                .map(|value| (value.value.as_str(), value.source)),
3468            Some(("gpt-bound", RouteValueSource::ProfileDefault))
3469        );
3470        assert_eq!(
3471            cards[0]
3472                .effective_reasoning_effort
3473                .as_ref()
3474                .map(|value| (value.value.as_str(), value.source)),
3475            Some(("high", RouteValueSource::ProfileDefault))
3476        );
3477        assert_eq!(
3478            cards[0]
3479                .effective_service_tier
3480                .as_ref()
3481                .map(|value| (value.value.as_str(), value.source)),
3482            Some(("priority", RouteValueSource::ProfileDefault))
3483        );
3484        assert_eq!(
3485            cards[0]
3486                .effective_station
3487                .as_ref()
3488                .map(|value| (value.value.as_str(), value.source)),
3489            Some(("right", RouteValueSource::GlobalOverride))
3490        );
3491        assert!(cards[0].effective_upstream_base_url.is_none());
3492    }
3493
3494    #[test]
3495    fn enrich_session_identity_cards_with_runtime_applies_station_mapping_and_single_upstream() {
3496        let mut cards = vec![SessionIdentityCard {
3497            session_id: Some("sid-1".to_string()),
3498            last_model: Some("gpt-5.4".to_string()),
3499            last_station_name: Some("right".to_string()),
3500            last_upstream_base_url: Some("https://right.example/v1".to_string()),
3501            effective_model: Some(ResolvedRouteValue::new(
3502                "gpt-5.4",
3503                RouteValueSource::RequestPayload,
3504            )),
3505            effective_station: Some(ResolvedRouteValue::new(
3506                "right",
3507                RouteValueSource::RuntimeFallback,
3508            )),
3509            ..SessionIdentityCard::default()
3510        }];
3511
3512        let mut mgr = ServiceConfigManager {
3513            active: Some("right".to_string()),
3514            ..ServiceConfigManager::default()
3515        };
3516        mgr.configs.insert(
3517            "right".to_string(),
3518            ServiceConfig {
3519                name: "right".to_string(),
3520                alias: None,
3521                enabled: true,
3522                level: 1,
3523                upstreams: vec![UpstreamConfig {
3524                    base_url: "https://right.example/v1".to_string(),
3525                    auth: UpstreamAuth::default(),
3526                    tags: HashMap::new(),
3527                    supported_models: HashMap::new(),
3528                    model_mapping: HashMap::from([(
3529                        "gpt-5.4".to_string(),
3530                        "gpt-5.4-fast".to_string(),
3531                    )]),
3532                }],
3533            },
3534        );
3535
3536        enrich_session_identity_cards_with_runtime(&mut cards, &mgr);
3537
3538        assert_eq!(
3539            cards[0]
3540                .effective_model
3541                .as_ref()
3542                .map(|value| value.value.as_str()),
3543            Some("gpt-5.4-fast")
3544        );
3545        assert_eq!(
3546            cards[0].effective_model.as_ref().map(|value| value.source),
3547            Some(RouteValueSource::StationMapping)
3548        );
3549        assert_eq!(
3550            cards[0]
3551                .effective_upstream_base_url
3552                .as_ref()
3553                .map(|value| value.value.as_str()),
3554            Some("https://right.example/v1")
3555        );
3556    }
3557
3558    #[test]
3559    fn apply_session_profile_binding_sets_binding_and_clears_manual_overrides() {
3560        let runtime = tokio::runtime::Runtime::new().expect("runtime");
3561        runtime.block_on(async {
3562            let state = ProxyState::new();
3563            let now_ms = 42;
3564            let mut mgr = ServiceConfigManager::default();
3565            mgr.configs.insert(
3566                "right".to_string(),
3567                ServiceConfig {
3568                    name: "right".to_string(),
3569                    alias: None,
3570                    enabled: true,
3571                    level: 1,
3572                    upstreams: vec![UpstreamConfig {
3573                        base_url: "https://right.example/v1".to_string(),
3574                        auth: UpstreamAuth::default(),
3575                        tags: HashMap::from([
3576                            ("supports_reasoning_effort".to_string(), "true".to_string()),
3577                            ("supports_service_tier".to_string(), "true".to_string()),
3578                        ]),
3579                        supported_models: HashMap::from([("gpt-5.4".to_string(), true)]),
3580                        model_mapping: HashMap::new(),
3581                    }],
3582                },
3583            );
3584            mgr.profiles.insert(
3585                "fast".to_string(),
3586                crate::config::ServiceControlProfile {
3587                    extends: None,
3588                    station: Some("right".to_string()),
3589                    model: Some("gpt-5.4".to_string()),
3590                    reasoning_effort: Some("low".to_string()),
3591                    service_tier: Some("flex".to_string()),
3592                },
3593            );
3594
3595            state
3596                .set_session_station_override("sid-1".to_string(), "other".to_string(), 1)
3597                .await;
3598            state
3599                .set_session_model_override("sid-1".to_string(), "gpt-x".to_string(), 1)
3600                .await;
3601            state
3602                .set_session_effort_override("sid-1".to_string(), "high".to_string(), 1)
3603                .await;
3604            state
3605                .set_session_service_tier_override("sid-1".to_string(), "priority".to_string(), 1)
3606                .await;
3607
3608            state
3609                .apply_session_profile_binding(
3610                    "codex",
3611                    &mgr,
3612                    "sid-1".to_string(),
3613                    "fast".to_string(),
3614                    now_ms,
3615                )
3616                .await
3617                .expect("apply profile");
3618
3619            let binding = state
3620                .get_session_binding("sid-1")
3621                .await
3622                .expect("binding exists");
3623            assert_eq!(binding.profile_name.as_deref(), Some("fast"));
3624            assert_eq!(binding.station_name.as_deref(), Some("right"));
3625            assert_eq!(binding.model.as_deref(), Some("gpt-5.4"));
3626            assert_eq!(binding.reasoning_effort.as_deref(), Some("low"));
3627            assert_eq!(binding.service_tier.as_deref(), Some("flex"));
3628            assert_eq!(
3629                binding.continuity_mode,
3630                SessionContinuityMode::ManualProfile
3631            );
3632            assert_eq!(binding.updated_at_ms, now_ms);
3633            assert!(state.get_session_station_override("sid-1").await.is_none());
3634            assert!(state.get_session_model_override("sid-1").await.is_none());
3635            assert!(state.get_session_effort_override("sid-1").await.is_none());
3636            assert!(
3637                state
3638                    .get_session_service_tier_override("sid-1")
3639                    .await
3640                    .is_none()
3641            );
3642        });
3643    }
3644
3645    #[test]
3646    fn list_session_manual_overrides_merges_all_dimensions() {
3647        let runtime = tokio::runtime::Runtime::new().expect("runtime");
3648        runtime.block_on(async {
3649            let state = ProxyState::new();
3650            state
3651                .set_session_reasoning_effort_override("sid-1".to_string(), "high".to_string(), 1)
3652                .await;
3653            state
3654                .set_session_station_override("sid-1".to_string(), "right".to_string(), 1)
3655                .await;
3656            state
3657                .set_session_model_override("sid-1".to_string(), "gpt-5.4".to_string(), 1)
3658                .await;
3659            state
3660                .set_session_service_tier_override("sid-1".to_string(), "priority".to_string(), 1)
3661                .await;
3662            state
3663                .set_session_model_override("sid-2".to_string(), "gpt-5.4-mini".to_string(), 2)
3664                .await;
3665
3666            let merged = state.list_session_manual_overrides().await;
3667            assert_eq!(merged.len(), 2);
3668            assert_eq!(
3669                merged
3670                    .get("sid-1")
3671                    .and_then(|overrides| overrides.reasoning_effort.as_deref()),
3672                Some("high")
3673            );
3674            assert_eq!(
3675                merged
3676                    .get("sid-1")
3677                    .and_then(|overrides| overrides.station_name.as_deref()),
3678                Some("right")
3679            );
3680            assert_eq!(
3681                merged
3682                    .get("sid-1")
3683                    .and_then(|overrides| overrides.model.as_deref()),
3684                Some("gpt-5.4")
3685            );
3686            assert_eq!(
3687                merged
3688                    .get("sid-1")
3689                    .and_then(|overrides| overrides.service_tier.as_deref()),
3690                Some("priority")
3691            );
3692            assert_eq!(
3693                merged
3694                    .get("sid-2")
3695                    .and_then(|overrides| overrides.model.as_deref()),
3696                Some("gpt-5.4-mini")
3697            );
3698            assert!(
3699                merged
3700                    .get("sid-2")
3701                    .is_some_and(|overrides| overrides.reasoning_effort.is_none())
3702            );
3703        });
3704    }
3705
3706    #[test]
3707    fn provider_balance_snapshots_are_recorded_and_refreshed() {
3708        let runtime = tokio::runtime::Runtime::new().expect("runtime");
3709        runtime.block_on(async {
3710            let state = ProxyState::new();
3711            state
3712                .record_provider_balance_snapshot(
3713                    "codex",
3714                    ProviderBalanceSnapshot {
3715                        provider_id: "packycode".to_string(),
3716                        station_name: Some("right".to_string()),
3717                        upstream_index: Some(2),
3718                        source: "usage_provider:budget_http_json".to_string(),
3719                        fetched_at_ms: 10,
3720                        stale_after_ms: Some(0),
3721                        stale: false,
3722                        status: BalanceSnapshotStatus::Unknown,
3723                        exhausted: Some(false),
3724                        exhaustion_affects_routing: true,
3725                        plan_name: None,
3726                        total_balance_usd: Some("3.5".to_string()),
3727                        subscription_balance_usd: None,
3728                        paygo_balance_usd: None,
3729                        monthly_budget_usd: Some("5".to_string()),
3730                        monthly_spent_usd: Some("1.5".to_string()),
3731                        quota_period: None,
3732                        quota_remaining_usd: None,
3733                        quota_limit_usd: None,
3734                        quota_used_usd: None,
3735                        unlimited_quota: None,
3736                        total_used_usd: None,
3737                        today_used_usd: None,
3738                        total_requests: None,
3739                        today_requests: None,
3740                        total_tokens: None,
3741                        today_tokens: None,
3742                        error: None,
3743                    },
3744                )
3745                .await;
3746
3747            let view = state.get_provider_balance_view("codex").await;
3748            let balances = view.get("right").expect("station balance");
3749            assert_eq!(balances.len(), 1);
3750            assert_eq!(balances[0].provider_id, "packycode");
3751            assert_eq!(balances[0].status, BalanceSnapshotStatus::Stale);
3752            assert_eq!(balances[0].exhausted, Some(false));
3753
3754            let summary = state
3755                .get_provider_balance_summary_view("codex")
3756                .await
3757                .get("right")
3758                .cloned()
3759                .expect("station balance summary");
3760            assert_eq!(summary.snapshots, 1);
3761            assert_eq!(summary.stale, 1);
3762            assert_eq!(summary.routing_snapshots, 1);
3763        });
3764    }
3765
3766    #[test]
3767    fn provider_balance_snapshots_keep_multiple_providers_per_upstream() {
3768        let runtime = tokio::runtime::Runtime::new().expect("runtime");
3769        runtime.block_on(async {
3770            let state = ProxyState::new();
3771            for (provider_id, status) in [
3772                ("general", BalanceSnapshotStatus::Error),
3773                ("newapi", BalanceSnapshotStatus::Ok),
3774            ] {
3775                state
3776                    .record_provider_balance_snapshot(
3777                        "codex",
3778                        ProviderBalanceSnapshot {
3779                            provider_id: provider_id.to_string(),
3780                            station_name: Some("routing".to_string()),
3781                            upstream_index: Some(1),
3782                            source: "usage_provider:test".to_string(),
3783                            fetched_at_ms: 10,
3784                            stale_after_ms: None,
3785                            stale: false,
3786                            status,
3787                            exhausted: if status == BalanceSnapshotStatus::Ok {
3788                                Some(false)
3789                            } else {
3790                                None
3791                            },
3792                            exhaustion_affects_routing: true,
3793                            plan_name: None,
3794                            total_balance_usd: if status == BalanceSnapshotStatus::Ok {
3795                                Some("3.5".to_string())
3796                            } else {
3797                                None
3798                            },
3799                            subscription_balance_usd: None,
3800                            paygo_balance_usd: None,
3801                            monthly_budget_usd: None,
3802                            monthly_spent_usd: None,
3803                            quota_period: None,
3804                            quota_remaining_usd: None,
3805                            quota_limit_usd: None,
3806                            quota_used_usd: None,
3807                            unlimited_quota: None,
3808                            total_used_usd: None,
3809                            today_used_usd: None,
3810                            total_requests: None,
3811                            today_requests: None,
3812                            total_tokens: None,
3813                            today_tokens: None,
3814                            error: if status == BalanceSnapshotStatus::Error {
3815                                Some("decode failed".to_string())
3816                            } else {
3817                                None
3818                            },
3819                        },
3820                    )
3821                    .await;
3822            }
3823
3824            let view = state.get_provider_balance_view("codex").await;
3825            let balances = view.get("routing").expect("station balance");
3826            assert_eq!(balances.len(), 2);
3827            assert_eq!(
3828                balances
3829                    .iter()
3830                    .map(|snapshot| snapshot.provider_id.as_str())
3831                    .collect::<Vec<_>>(),
3832                vec!["general", "newapi"]
3833            );
3834        });
3835    }
3836
3837    #[test]
3838    fn prune_runtime_observability_keeps_catalog_provider_balances_on_routing_layout_change() {
3839        let runtime = tokio::runtime::Runtime::new().expect("runtime");
3840        runtime.block_on(async {
3841            let state = ProxyState::new();
3842
3843            let mut initial_mgr = ServiceConfigManager::default();
3844            initial_mgr.configs.insert(
3845                "routing".to_string(),
3846                ServiceConfig {
3847                    name: "routing".to_string(),
3848                    alias: None,
3849                    enabled: true,
3850                    level: 1,
3851                    upstreams: vec![
3852                        UpstreamConfig {
3853                            base_url: "https://input.example/v1".to_string(),
3854                            auth: UpstreamAuth::default(),
3855                            tags: HashMap::from([("provider_id".to_string(), "input".to_string())]),
3856                            supported_models: HashMap::new(),
3857                            model_mapping: HashMap::new(),
3858                        },
3859                        UpstreamConfig {
3860                            base_url: "https://backup.example/v1".to_string(),
3861                            auth: UpstreamAuth::default(),
3862                            tags: HashMap::from([(
3863                                "provider_id".to_string(),
3864                                "backup".to_string(),
3865                            )]),
3866                            supported_models: HashMap::new(),
3867                            model_mapping: HashMap::new(),
3868                        },
3869                    ],
3870                },
3871            );
3872            state
3873                .prune_runtime_observability_for_service("codex", &initial_mgr)
3874                .await;
3875
3876            state
3877                .record_provider_balance_snapshot(
3878                    "codex",
3879                    ProviderBalanceSnapshot {
3880                        provider_id: "input".to_string(),
3881                        station_name: Some("input".to_string()),
3882                        upstream_index: Some(0),
3883                        source: "usage_provider:test".to_string(),
3884                        fetched_at_ms: 10,
3885                        stale_after_ms: None,
3886                        stale: false,
3887                        status: BalanceSnapshotStatus::Ok,
3888                        exhausted: Some(false),
3889                        exhaustion_affects_routing: true,
3890                        total_balance_usd: Some("3.5".to_string()),
3891                        ..ProviderBalanceSnapshot::default()
3892                    },
3893                )
3894                .await;
3895            state
3896                .record_provider_balance_snapshot(
3897                    "codex",
3898                    ProviderBalanceSnapshot {
3899                        provider_id: "input".to_string(),
3900                        station_name: Some("routing".to_string()),
3901                        upstream_index: Some(0),
3902                        source: "usage_provider:test".to_string(),
3903                        fetched_at_ms: 10,
3904                        stale_after_ms: None,
3905                        stale: false,
3906                        status: BalanceSnapshotStatus::Ok,
3907                        exhausted: Some(false),
3908                        exhaustion_affects_routing: true,
3909                        total_balance_usd: Some("3.5".to_string()),
3910                        ..ProviderBalanceSnapshot::default()
3911                    },
3912                )
3913                .await;
3914
3915            let mut pinned_mgr = ServiceConfigManager::default();
3916            pinned_mgr.configs.insert(
3917                "routing".to_string(),
3918                ServiceConfig {
3919                    name: "routing".to_string(),
3920                    alias: None,
3921                    enabled: true,
3922                    level: 1,
3923                    upstreams: vec![UpstreamConfig {
3924                        base_url: "https://input.example/v1".to_string(),
3925                        auth: UpstreamAuth::default(),
3926                        tags: HashMap::from([("provider_id".to_string(), "input".to_string())]),
3927                        supported_models: HashMap::new(),
3928                        model_mapping: HashMap::new(),
3929                    }],
3930                },
3931            );
3932            state
3933                .prune_runtime_observability_for_service("codex", &pinned_mgr)
3934                .await;
3935
3936            let view = state.get_provider_balance_view("codex").await;
3937            assert!(view.contains_key("input"));
3938            assert!(!view.contains_key("routing"));
3939        });
3940    }
3941
3942    #[test]
3943    fn prune_runtime_observability_removes_stale_service_keys() {
3944        let runtime = tokio::runtime::Runtime::new().expect("runtime");
3945        runtime.block_on(async {
3946            let state = ProxyState::new();
3947
3948            state
3949                .set_station_enabled_override("codex", "old".to_string(), false, 1)
3950                .await;
3951            state
3952                .set_upstream_enabled_override(
3953                    "codex",
3954                    "https://old.example/v1".to_string(),
3955                    false,
3956                    1,
3957                )
3958                .await;
3959            state
3960                .set_provider_endpoint_enabled_override(
3961                    "codex",
3962                    ProviderEndpointKey::new("codex", "provider-old", "default"),
3963                    false,
3964                    1,
3965                )
3966                .await;
3967            state
3968                .set_provider_endpoint_runtime_state_override(
3969                    "codex",
3970                    ProviderEndpointKey::new("codex", "provider-old", "default"),
3971                    RuntimeConfigState::BreakerOpen,
3972                    1,
3973                )
3974                .await;
3975            state
3976                .record_station_health(
3977                    "codex",
3978                    "old".to_string(),
3979                    StationHealth {
3980                        checked_at_ms: 10,
3981                        upstreams: vec![UpstreamHealth {
3982                            base_url: "https://old.example/v1".to_string(),
3983                            ok: Some(false),
3984                            status_code: Some(500),
3985                            latency_ms: Some(10),
3986                            error: Some("boom".to_string()),
3987                            passive: None,
3988                        }],
3989                    },
3990                )
3991                .await;
3992            state
3993                .record_passive_upstream_failure(PassiveUpstreamFailureRecord {
3994                    service_name: "codex".to_string(),
3995                    station_name: "old".to_string(),
3996                    base_url: "https://old.example/v1".to_string(),
3997                    status_code: Some(500),
3998                    error_class: Some("upstream_transport_error".to_string()),
3999                    error: Some("boom".to_string()),
4000                    now_ms: 20,
4001                })
4002                .await;
4003            state
4004                .record_provider_balance_snapshot(
4005                    "codex",
4006                    ProviderBalanceSnapshot {
4007                        provider_id: "provider-old".to_string(),
4008                        station_name: Some("old".to_string()),
4009                        upstream_index: Some(0),
4010                        source: "usage_provider:budget_http_json".to_string(),
4011                        fetched_at_ms: 10,
4012                        stale_after_ms: None,
4013                        stale: false,
4014                        status: BalanceSnapshotStatus::Ok,
4015                        exhausted: Some(false),
4016                        exhaustion_affects_routing: true,
4017                        plan_name: None,
4018                        total_balance_usd: Some("3.5".to_string()),
4019                        subscription_balance_usd: None,
4020                        paygo_balance_usd: None,
4021                        monthly_budget_usd: None,
4022                        monthly_spent_usd: None,
4023                        quota_period: None,
4024                        quota_remaining_usd: None,
4025                        quota_limit_usd: None,
4026                        quota_used_usd: None,
4027                        unlimited_quota: None,
4028                        total_used_usd: None,
4029                        today_used_usd: None,
4030                        total_requests: None,
4031                        today_requests: None,
4032                        total_tokens: None,
4033                        today_tokens: None,
4034                        error: None,
4035                    },
4036                )
4037                .await;
4038
4039            let request_id = state
4040                .begin_request(
4041                    "codex",
4042                    "POST",
4043                    "/v1/responses",
4044                    Some("sid-old".to_string()),
4045                    None,
4046                    None,
4047                    None,
4048                    Some("gpt-5".to_string()),
4049                    None,
4050                    None,
4051                    30,
4052                )
4053                .await;
4054            state
4055                .update_request_route(
4056                    request_id,
4057                    Some("old".to_string()),
4058                    Some("provider-old".to_string()),
4059                    "https://old.example/v1".to_string(),
4060                    None,
4061                )
4062                .await;
4063            state
4064                .finish_request(FinishRequestParams {
4065                    id: request_id,
4066                    status_code: 200,
4067                    duration_ms: 5,
4068                    ended_at_ms: 35,
4069                    observed_service_tier: None,
4070                    usage: None,
4071                    retry: None,
4072                    ttfb_ms: None,
4073                    streaming: false,
4074                })
4075                .await;
4076
4077            let mut mgr = ServiceConfigManager::default();
4078            mgr.configs.insert(
4079                "new".to_string(),
4080                ServiceConfig {
4081                    name: "new".to_string(),
4082                    alias: None,
4083                    enabled: true,
4084                    level: 1,
4085                    upstreams: vec![UpstreamConfig {
4086                        base_url: "https://new.example/v1".to_string(),
4087                        auth: UpstreamAuth::default(),
4088                        tags: HashMap::from([(
4089                            "provider_id".to_string(),
4090                            "provider-new".to_string(),
4091                        )]),
4092                        supported_models: HashMap::new(),
4093                        model_mapping: HashMap::new(),
4094                    }],
4095                },
4096            );
4097
4098            state
4099                .prune_runtime_observability_for_service("codex", &mgr)
4100                .await;
4101
4102            assert!(state.get_station_meta_overrides("codex").await.is_empty());
4103            assert!(state.get_upstream_meta_overrides("codex").await.is_empty());
4104            assert!(state.get_station_health("codex").await.is_empty());
4105            assert!(state.get_provider_balance_view("codex").await.is_empty());
4106
4107            let rollup = state.get_usage_rollup_view("codex", 10, 10).await;
4108            assert!(rollup.by_config.is_empty());
4109            assert!(rollup.by_provider.is_empty());
4110        });
4111    }
4112
4113    #[test]
4114    fn get_upstream_meta_overrides_merges_endpoint_and_legacy_base_url_entries() {
4115        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4116        runtime.block_on(async {
4117            let state = ProxyState::new();
4118
4119            state
4120                .set_provider_endpoint_enabled_override(
4121                    "codex",
4122                    ProviderEndpointKey::new("codex", "alpha", "default"),
4123                    false,
4124                    1,
4125                )
4126                .await;
4127            state
4128                .set_provider_endpoint_runtime_state_override(
4129                    "codex",
4130                    ProviderEndpointKey::new("codex", "alpha", "default"),
4131                    RuntimeConfigState::BreakerOpen,
4132                    2,
4133                )
4134                .await;
4135            state
4136                .set_upstream_enabled_override(
4137                    "codex",
4138                    "https://legacy.example/v1".to_string(),
4139                    true,
4140                    3,
4141                )
4142                .await;
4143            state
4144                .set_upstream_runtime_state_override(
4145                    "codex",
4146                    "https://legacy.example/v1".to_string(),
4147                    RuntimeConfigState::Draining,
4148                    4,
4149                )
4150                .await;
4151
4152            let overrides = state.get_upstream_meta_overrides("codex").await;
4153
4154            assert_eq!(
4155                overrides.get("codex/alpha/default"),
4156                Some(&(Some(false), Some(RuntimeConfigState::BreakerOpen)))
4157            );
4158            assert_eq!(
4159                overrides.get("https://legacy.example/v1"),
4160                Some(&(Some(true), Some(RuntimeConfigState::Draining)))
4161            );
4162        });
4163    }
4164
4165    #[test]
4166    fn provider_endpoint_runtime_health_is_keyed_by_provider_endpoint() {
4167        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4168        runtime.block_on(async {
4169            let state = ProxyState::new();
4170            let monthly = ProviderEndpointKey::new("codex", "monthly", "default");
4171            let fallback = ProviderEndpointKey::new("codex", "fallback", "default");
4172
4173            state
4174                .record_provider_endpoint_attempt_success("codex", fallback.clone(), 10)
4175                .await;
4176            state
4177                .set_provider_endpoint_usage_exhausted("codex", monthly.clone(), true)
4178                .await;
4179            state
4180                .record_provider_endpoint_attempt_failure(
4181                    "codex",
4182                    monthly.clone(),
4183                    0,
4184                    crate::lb::CooldownBackoff {
4185                        factor: 1,
4186                        max_secs: 0,
4187                    },
4188                )
4189                .await;
4190            state
4191                .record_provider_endpoint_attempt_failure(
4192                    "codex",
4193                    monthly.clone(),
4194                    0,
4195                    crate::lb::CooldownBackoff {
4196                        factor: 1,
4197                        max_secs: 0,
4198                    },
4199                )
4200                .await;
4201            state
4202                .record_provider_endpoint_attempt_failure(
4203                    "codex",
4204                    monthly.clone(),
4205                    30,
4206                    crate::lb::CooldownBackoff {
4207                        factor: 1,
4208                        max_secs: 0,
4209                    },
4210                )
4211                .await;
4212
4213            let runtime = state
4214                .route_plan_runtime_state_for_provider_endpoints("codex")
4215                .await;
4216            let monthly_state = runtime.provider_endpoint(&monthly);
4217            assert_eq!(monthly_state.failure_count, crate::lb::FAILURE_THRESHOLD);
4218            assert!(monthly_state.cooldown_active);
4219            assert!(monthly_state.usage_exhausted);
4220            assert_eq!(runtime.affinity_provider_endpoint(), Some(&fallback));
4221
4222            state
4223                .set_provider_endpoint_runtime_state_override(
4224                    "codex",
4225                    fallback.clone(),
4226                    RuntimeConfigState::BreakerOpen,
4227                    20,
4228                )
4229                .await;
4230            let runtime = state
4231                .route_plan_runtime_state_for_provider_endpoints("codex")
4232                .await;
4233            assert!(runtime.provider_endpoint(&fallback).runtime_disabled);
4234        });
4235    }
4236
4237    #[test]
4238    fn get_station_health_merges_passive_runtime_observations() {
4239        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4240        runtime.block_on(async {
4241            let state = ProxyState::new();
4242            state
4243                .record_station_health(
4244                    "codex",
4245                    "right".to_string(),
4246                    StationHealth {
4247                        checked_at_ms: 10,
4248                        upstreams: vec![UpstreamHealth {
4249                            base_url: "https://right.example/v1".to_string(),
4250                            ok: Some(true),
4251                            status_code: Some(200),
4252                            latency_ms: Some(120),
4253                            error: None,
4254                            passive: None,
4255                        }],
4256                    },
4257                )
4258                .await;
4259            state
4260                .record_passive_upstream_failure(PassiveUpstreamFailureRecord {
4261                    service_name: "codex".to_string(),
4262                    station_name: "right".to_string(),
4263                    base_url: "https://right.example/v1".to_string(),
4264                    status_code: Some(500),
4265                    error_class: Some("cloudflare_timeout".to_string()),
4266                    error: Some("upstream timed out".to_string()),
4267                    now_ms: 20,
4268                })
4269                .await;
4270            state
4271                .record_passive_upstream_success(
4272                    "codex",
4273                    "right",
4274                    "https://backup.example/v1",
4275                    Some(200),
4276                    30,
4277                )
4278                .await;
4279
4280            let health = state.get_station_health("codex").await;
4281            let right = health.get("right").expect("right health");
4282            assert_eq!(right.checked_at_ms, 30);
4283            assert_eq!(right.upstreams.len(), 2);
4284
4285            let primary = right
4286                .upstreams
4287                .iter()
4288                .find(|upstream| upstream.base_url == "https://right.example/v1")
4289                .expect("primary upstream");
4290            assert_eq!(primary.ok, Some(true));
4291            let primary_passive = primary.passive.as_ref().expect("primary passive");
4292            assert_eq!(primary_passive.state, PassiveHealthState::Degraded);
4293            assert_eq!(primary_passive.score, 50);
4294            assert_eq!(primary_passive.last_status_code, Some(500));
4295            assert_eq!(
4296                primary_passive.last_error_class.as_deref(),
4297                Some("cloudflare_timeout")
4298            );
4299
4300            let backup = right
4301                .upstreams
4302                .iter()
4303                .find(|upstream| upstream.base_url == "https://backup.example/v1")
4304                .expect("backup upstream");
4305            assert_eq!(backup.ok, None);
4306            let backup_passive = backup.passive.as_ref().expect("backup passive");
4307            assert_eq!(backup_passive.state, PassiveHealthState::Healthy);
4308            assert_eq!(backup_passive.score, 100);
4309            assert_eq!(backup_passive.last_status_code, Some(200));
4310        });
4311    }
4312
4313    #[test]
4314    fn passive_health_success_recovers_after_failure() {
4315        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4316        runtime.block_on(async {
4317            let state = ProxyState::new();
4318            state
4319                .record_passive_upstream_failure(PassiveUpstreamFailureRecord {
4320                    service_name: "codex".to_string(),
4321                    station_name: "right".to_string(),
4322                    base_url: "https://right.example/v1".to_string(),
4323                    status_code: Some(500),
4324                    error_class: Some("cloudflare_timeout".to_string()),
4325                    error: Some("upstream timed out".to_string()),
4326                    now_ms: 10,
4327                })
4328                .await;
4329            state
4330                .record_passive_upstream_success(
4331                    "codex",
4332                    "right",
4333                    "https://right.example/v1",
4334                    Some(200),
4335                    20,
4336                )
4337                .await;
4338
4339            let health = state.get_station_health("codex").await;
4340            let right = health.get("right").expect("right health");
4341            let upstream = right.upstreams.first().expect("upstream");
4342            let passive = upstream.passive.as_ref().expect("passive");
4343            assert_eq!(passive.state, PassiveHealthState::Healthy);
4344            assert_eq!(passive.score, 100);
4345            assert_eq!(passive.consecutive_failures, 0);
4346            assert_eq!(passive.last_success_at_ms, Some(20));
4347            assert_eq!(passive.last_failure_at_ms, Some(10));
4348            assert_eq!(passive.last_error_class, None);
4349            assert_eq!(passive.last_error, None);
4350        });
4351    }
4352
4353    #[test]
4354    fn apply_session_profile_binding_uses_inherited_profile_values() {
4355        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4356        runtime.block_on(async {
4357            let state = ProxyState::new();
4358            let mut mgr = ServiceConfigManager::default();
4359            mgr.configs.insert(
4360                "right".to_string(),
4361                ServiceConfig {
4362                    name: "right".to_string(),
4363                    alias: None,
4364                    enabled: true,
4365                    level: 1,
4366                    upstreams: vec![UpstreamConfig {
4367                        base_url: "https://right.example/v1".to_string(),
4368                        auth: UpstreamAuth::default(),
4369                        tags: HashMap::from([
4370                            ("supports_reasoning_effort".to_string(), "true".to_string()),
4371                            ("supports_service_tier".to_string(), "true".to_string()),
4372                        ]),
4373                        supported_models: HashMap::from([("gpt-5.4".to_string(), true)]),
4374                        model_mapping: HashMap::new(),
4375                    }],
4376                },
4377            );
4378            mgr.profiles.insert(
4379                "base".to_string(),
4380                crate::config::ServiceControlProfile {
4381                    extends: None,
4382                    station: Some("right".to_string()),
4383                    model: Some("gpt-5.4".to_string()),
4384                    reasoning_effort: None,
4385                    service_tier: Some("priority".to_string()),
4386                },
4387            );
4388            mgr.profiles.insert(
4389                "fast".to_string(),
4390                crate::config::ServiceControlProfile {
4391                    extends: Some("base".to_string()),
4392                    station: None,
4393                    model: None,
4394                    reasoning_effort: Some("low".to_string()),
4395                    service_tier: None,
4396                },
4397            );
4398
4399            state
4400                .apply_session_profile_binding(
4401                    "codex",
4402                    &mgr,
4403                    "sid-inherited".to_string(),
4404                    "fast".to_string(),
4405                    100,
4406                )
4407                .await
4408                .expect("apply inherited profile");
4409
4410            let binding = state
4411                .get_session_binding("sid-inherited")
4412                .await
4413                .expect("binding exists");
4414            assert_eq!(binding.profile_name.as_deref(), Some("fast"));
4415            assert_eq!(binding.station_name.as_deref(), Some("right"));
4416            assert_eq!(binding.model.as_deref(), Some("gpt-5.4"));
4417            assert_eq!(binding.reasoning_effort.as_deref(), Some("low"));
4418            assert_eq!(binding.service_tier.as_deref(), Some("priority"));
4419        });
4420    }
4421
4422    #[test]
4423    fn prune_periodic_keeps_sticky_binding_after_manual_override_ttl_expires() {
4424        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4425        runtime.block_on(async {
4426            let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(1, 0, 2_000));
4427            state
4428                .set_session_model_override("sid-sticky".to_string(), "gpt-5.4".to_string(), 0)
4429                .await;
4430            state
4431                .set_session_binding(SessionBinding {
4432                    session_id: "sid-sticky".to_string(),
4433                    profile_name: Some("daily".to_string()),
4434                    station_name: Some("right".to_string()),
4435                    model: Some("gpt-5.4".to_string()),
4436                    reasoning_effort: Some("medium".to_string()),
4437                    service_tier: Some("default".to_string()),
4438                    continuity_mode: SessionContinuityMode::DefaultProfile,
4439                    created_at_ms: 0,
4440                    updated_at_ms: 0,
4441                    last_seen_ms: 0,
4442                })
4443                .await;
4444
4445            state.prune_periodic().await;
4446
4447            assert!(
4448                state
4449                    .get_session_model_override("sid-sticky")
4450                    .await
4451                    .is_none()
4452            );
4453            assert!(state.get_session_binding("sid-sticky").await.is_some());
4454        });
4455    }
4456
4457    #[test]
4458    fn prune_periodic_honors_opt_in_binding_ttl() {
4459        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4460        runtime.block_on(async {
4461            let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(0, 1, 2_000));
4462            state
4463                .set_session_binding(SessionBinding {
4464                    session_id: "sid-expire".to_string(),
4465                    profile_name: Some("daily".to_string()),
4466                    station_name: Some("right".to_string()),
4467                    model: Some("gpt-5.4".to_string()),
4468                    reasoning_effort: Some("medium".to_string()),
4469                    service_tier: Some("default".to_string()),
4470                    continuity_mode: SessionContinuityMode::DefaultProfile,
4471                    created_at_ms: 0,
4472                    updated_at_ms: 0,
4473                    last_seen_ms: 0,
4474                })
4475                .await;
4476
4477            state.prune_periodic().await;
4478
4479            assert!(state.get_session_binding("sid-expire").await.is_none());
4480        });
4481    }
4482
4483    #[test]
4484    fn session_route_affinity_ttl_is_enforced_on_read() {
4485        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4486        runtime.block_on(async {
4487            let mut policy = test_runtime_policy(0, 0, 2_000);
4488            policy.session_route_affinity_ttl_ms = 1;
4489            let state = ProxyState::new_with_runtime_policy(None, policy);
4490            state
4491                .record_session_route_affinity_success(
4492                    "sid-expire",
4493                    SessionRouteAffinityTarget {
4494                        route_graph_key: "graph".to_string(),
4495                        provider_endpoint: ProviderEndpointKey::new("codex", "monthly", "default"),
4496                        upstream_base_url: "https://monthly.example/v1".to_string(),
4497                        route_path: vec!["monthly_first".to_string(), "monthly".to_string()],
4498                    },
4499                    Some("first_success".to_string()),
4500                    0,
4501                )
4502                .await;
4503
4504            assert!(
4505                state
4506                    .get_session_route_affinity("sid-expire")
4507                    .await
4508                    .is_none()
4509            );
4510            assert!(state.list_session_route_affinities().await.is_empty());
4511        });
4512    }
4513
4514    #[test]
4515    fn prune_periodic_caps_sticky_bindings_by_last_seen() {
4516        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4517        runtime.block_on(async {
4518            let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(0, 0, 2));
4519            for idx in 0..3 {
4520                state
4521                    .set_session_binding(SessionBinding {
4522                        session_id: format!("sid-{idx}"),
4523                        profile_name: Some("daily".to_string()),
4524                        station_name: Some("right".to_string()),
4525                        model: Some("gpt-5.4".to_string()),
4526                        reasoning_effort: Some("medium".to_string()),
4527                        service_tier: Some("default".to_string()),
4528                        continuity_mode: SessionContinuityMode::DefaultProfile,
4529                        created_at_ms: idx,
4530                        updated_at_ms: idx,
4531                        last_seen_ms: idx,
4532                    })
4533                    .await;
4534            }
4535
4536            state.prune_periodic().await;
4537
4538            assert!(state.get_session_binding("sid-0").await.is_none());
4539            assert!(state.get_session_binding("sid-1").await.is_some());
4540            assert!(state.get_session_binding("sid-2").await.is_some());
4541        });
4542    }
4543
4544    #[test]
4545    fn transcript_path_cache_records_negative_lookups() {
4546        let runtime = tokio::runtime::Runtime::new().expect("runtime");
4547        runtime.block_on(async {
4548            let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(0, 0, 2_000));
4549            let paths = state
4550                .resolve_host_transcript_paths_cached(&["missing-session".to_string()])
4551                .await;
4552
4553            assert_eq!(paths.get("missing-session"), Some(&None));
4554            let cache = state.session_transcript_path_cache.read().await;
4555            assert!(cache.contains_key("missing-session"));
4556        });
4557    }
4558}