1use std::collections::{HashMap, HashSet, VecDeque};
2use std::io::{Read, Seek, SeekFrom};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::sync::OnceLock;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use serde_json::Value as JsonValue;
10use tokio::sync::RwLock;
11use tokio::time::{Duration, interval};
12
13pub use crate::balance::{
14 BalanceSnapshotStatus, ProviderBalanceSnapshot, StationRoutingBalanceSummary,
15};
16use crate::config::ServiceConfigManager;
17use crate::lb::{COOLDOWN_SECS, CooldownBackoff, FAILURE_THRESHOLD, LbState};
18#[cfg(test)]
19use crate::pricing::CostBreakdown;
20use crate::pricing::{CostAdjustments, estimate_request_cost_from_operator_catalog_for_service};
21use crate::routing_ir::{RoutePlanRuntimeState, RoutePlanUpstreamRuntimeState};
22use crate::runtime_identity::ProviderEndpointKey;
23use crate::sessions;
24use crate::usage::UsageMetrics;
25
26mod runtime_types;
27mod session_identity;
28
29use self::runtime_types::{
30 ConfigMetaOverride, RuntimeDefaultProfileOverride, UsageRollup, merge_station_health,
31};
32pub use self::runtime_types::{
33 HealthCheckStatus, LbConfigView, LbUpstreamView, PassiveHealthState, PassiveUpstreamHealth,
34 RuntimeConfigState, StationHealth, UpstreamHealth, UsageBucket, UsageRollupCoverage,
35 UsageRollupView,
36};
37pub use self::session_identity::{
38 ActiveRequest, FinishRequestParams, FinishedRequest, RequestObservability, ResolvedRouteValue,
39 RouteDecisionProvenance, RouteValueSource, SessionBinding, SessionContinuityMode,
40 SessionIdentityCard, SessionIdentityCardBuildInputs, SessionManualOverrides,
41 SessionObservationScope, SessionRouteAffinity, SessionRouteAffinityTarget, SessionStats,
42 build_session_identity_cards_from_parts, enrich_session_identity_cards_with_host_transcripts,
43 enrich_session_identity_cards_with_runtime,
44};
45use self::session_identity::{
46 SessionBindingEntry, SessionCwdCacheEntry, SessionEffortOverride, SessionModelOverride,
47 SessionRouteTargetOverride, SessionServiceTierOverride, SessionStationOverride,
48};
49
50type PassiveStationHealthMap =
51 HashMap<String, HashMap<String, HashMap<String, PassiveUpstreamHealth>>>;
52type ProviderBalanceMap =
53 HashMap<String, HashMap<String, HashMap<usize, HashMap<String, ProviderBalanceSnapshot>>>>;
54type ProviderBalanceSummaryMap = HashMap<String, HashMap<String, StationRoutingBalanceSummary>>;
55type ServiceLayoutSignature = Vec<(String, Vec<String>)>;
56
57#[derive(Debug, Clone, Default)]
58struct ProviderEndpointRuntimeHealth {
59 failure_count: u32,
60 cooldown_until: Option<std::time::Instant>,
61 usage_exhausted: bool,
62 penalty_streak: u32,
63 last_good_at_ms: Option<u64>,
64}
65
66#[derive(Debug, Clone)]
67struct SessionTranscriptPathCacheEntry {
68 path: Option<String>,
69 last_checked_ms: u64,
70 last_seen_ms: u64,
71}
72
73#[derive(Debug, Clone, Copy)]
74struct RuntimePolicy {
75 session_override_ttl_ms: u64,
76 session_binding_ttl_ms: u64,
77 session_binding_max_entries: usize,
78 session_route_affinity_ttl_ms: u64,
79 session_route_affinity_max_entries: usize,
80 session_cwd_cache_ttl_ms: u64,
81 session_cwd_cache_max_entries: usize,
82 session_transcript_path_cache_ttl_ms: u64,
83 session_transcript_path_cache_max_entries: usize,
84}
85
86pub struct PassiveUpstreamFailureRecord {
87 pub service_name: String,
88 pub station_name: String,
89 pub base_url: String,
90 pub status_code: Option<u16>,
91 pub error_class: Option<String>,
92 pub error: Option<String>,
93 pub now_ms: u64,
94}
95
96fn recent_finished_max() -> usize {
97 static MAX: OnceLock<usize> = OnceLock::new();
98 *MAX.get_or_init(|| {
99 std::env::var("CODEX_HELPER_RECENT_FINISHED_MAX")
100 .ok()
101 .and_then(|s| s.trim().parse::<usize>().ok())
102 .filter(|&n| n > 0)
103 .unwrap_or(2_000)
104 .clamp(200, 20_000)
105 })
106}
107
108fn unix_now_ms() -> u64 {
109 std::time::SystemTime::now()
110 .duration_since(std::time::UNIX_EPOCH)
111 .map(|d| d.as_millis() as u64)
112 .unwrap_or(0)
113}
114
115fn prune_lru_cache<T>(
116 cache: &mut HashMap<String, T>,
117 max_entries: usize,
118 last_seen: impl Fn(&T) -> u64,
119) {
120 if max_entries == 0 || cache.len() <= max_entries {
121 return;
122 }
123
124 let mut keys = cache
125 .iter()
126 .map(|(key, value)| (key.clone(), last_seen(value)))
127 .collect::<Vec<_>>();
128 keys.sort_by_key(|(_, seen)| *seen);
129 let remove_count = keys.len().saturating_sub(max_entries);
130 for (key, _) in keys.into_iter().take(remove_count) {
131 cache.remove(&key);
132 }
133}
134
135fn service_layout_signature(mgr: &ServiceConfigManager) -> ServiceLayoutSignature {
136 let mut entries = mgr
137 .stations()
138 .iter()
139 .map(|(station_name, service)| {
140 (
141 station_name.clone(),
142 service
143 .upstreams
144 .iter()
145 .map(|upstream| upstream.base_url.clone())
146 .collect::<Vec<_>>(),
147 )
148 })
149 .collect::<Vec<_>>();
150 entries.sort_by(|left, right| left.0.cmp(&right.0).then_with(|| left.1.cmp(&right.1)));
151 entries
152}
153
154fn changed_service_layout_stations(
155 previous: &ServiceLayoutSignature,
156 current: &ServiceLayoutSignature,
157) -> HashSet<String> {
158 let previous_by_station = previous
159 .iter()
160 .map(|(station_name, upstreams)| (station_name.as_str(), upstreams.as_slice()))
161 .collect::<HashMap<_, _>>();
162 let current_by_station = current
163 .iter()
164 .map(|(station_name, upstreams)| (station_name.as_str(), upstreams.as_slice()))
165 .collect::<HashMap<_, _>>();
166 let mut changed = HashSet::new();
167
168 for (station_name, upstreams) in previous {
169 if current_by_station
170 .get(station_name.as_str())
171 .is_none_or(|current_upstreams| *current_upstreams != upstreams.as_slice())
172 {
173 changed.insert(station_name.clone());
174 }
175 }
176
177 for (station_name, upstreams) in current {
178 if previous_by_station
179 .get(station_name.as_str())
180 .is_none_or(|previous_upstreams| *previous_upstreams != upstreams.as_slice())
181 {
182 changed.insert(station_name.clone());
183 }
184 }
185
186 changed
187}
188
189#[derive(Debug)]
193pub struct ProxyState {
194 next_request_id: AtomicU64,
195 session_override_ttl_ms: u64,
197 session_binding_ttl_ms: u64,
199 session_binding_max_entries: usize,
200 session_route_affinity_ttl_ms: u64,
201 session_route_affinity_max_entries: usize,
202 session_cwd_cache_ttl_ms: u64,
203 session_cwd_cache_max_entries: usize,
204 session_transcript_path_cache_ttl_ms: u64,
205 session_transcript_path_cache_max_entries: usize,
206 session_effort_overrides: RwLock<HashMap<String, SessionEffortOverride>>,
207 session_station_overrides: RwLock<HashMap<String, SessionStationOverride>>,
208 session_route_target_overrides: RwLock<HashMap<String, SessionRouteTargetOverride>>,
209 session_model_overrides: RwLock<HashMap<String, SessionModelOverride>>,
210 session_service_tier_overrides: RwLock<HashMap<String, SessionServiceTierOverride>>,
211 session_bindings: RwLock<HashMap<String, SessionBindingEntry>>,
212 session_route_affinities: RwLock<HashMap<String, SessionRouteAffinity>>,
213 global_station_override: RwLock<Option<String>>,
214 global_route_target_override: RwLock<Option<String>>,
215 runtime_default_profiles: RwLock<HashMap<String, RuntimeDefaultProfileOverride>>,
216 station_meta_overrides: RwLock<HashMap<String, HashMap<String, ConfigMetaOverride>>>,
217 provider_endpoint_meta_overrides:
219 RwLock<HashMap<String, HashMap<ProviderEndpointKey, ConfigMetaOverride>>>,
220 upstream_meta_overrides: RwLock<HashMap<String, HashMap<String, ConfigMetaOverride>>>,
222 session_cwd_cache: RwLock<HashMap<String, SessionCwdCacheEntry>>,
223 session_transcript_path_cache: RwLock<HashMap<String, SessionTranscriptPathCacheEntry>>,
224 session_stats: RwLock<HashMap<String, SessionStats>>,
225 active_requests: RwLock<HashMap<u64, ActiveRequest>>,
226 recent_finished: RwLock<VecDeque<FinishedRequest>>,
227 usage_rollups: RwLock<HashMap<String, UsageRollup>>,
228 station_health: RwLock<HashMap<String, HashMap<String, StationHealth>>>,
229 passive_station_health: RwLock<PassiveStationHealthMap>,
230 provider_balances: RwLock<ProviderBalanceMap>,
231 provider_balance_summaries: RwLock<ProviderBalanceSummaryMap>,
232 provider_endpoint_runtime_health:
233 RwLock<HashMap<String, HashMap<ProviderEndpointKey, ProviderEndpointRuntimeHealth>>>,
234 station_health_checks: RwLock<HashMap<String, HashMap<String, HealthCheckStatus>>>,
235 service_layout_signatures: RwLock<HashMap<String, ServiceLayoutSignature>>,
236 lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
237}
238
239impl ProxyState {
240 const MAX_HEALTH_RECORDS_PER_STATION: usize = 200;
241
242 #[allow(dead_code)]
243 pub fn new() -> Arc<Self> {
244 Self::new_with_lb_states(None)
245 }
246
247 pub fn new_with_lb_states(
248 lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
249 ) -> Arc<Self> {
250 let ttl_secs = std::env::var("CODEX_HELPER_SESSION_OVERRIDE_TTL_SECS")
251 .ok()
252 .and_then(|s| s.trim().parse::<u64>().ok())
253 .filter(|&n| n > 0)
254 .unwrap_or(30 * 60);
255 let ttl_ms = ttl_secs.saturating_mul(1000);
256 let binding_ttl_secs = std::env::var("CODEX_HELPER_SESSION_BINDING_TTL_SECS")
257 .ok()
258 .and_then(|s| s.trim().parse::<u64>().ok())
259 .unwrap_or(0);
260 let binding_ttl_ms = binding_ttl_secs.saturating_mul(1000);
261 let binding_max_entries = std::env::var("CODEX_HELPER_SESSION_BINDING_MAX_ENTRIES")
262 .ok()
263 .and_then(|s| s.trim().parse::<usize>().ok())
264 .unwrap_or(2_000);
265 let route_affinity_ttl_secs = std::env::var("CODEX_HELPER_SESSION_ROUTE_AFFINITY_TTL_SECS")
266 .ok()
267 .and_then(|s| s.trim().parse::<u64>().ok())
268 .unwrap_or(0);
269 let route_affinity_ttl_ms = route_affinity_ttl_secs.saturating_mul(1000);
270 let route_affinity_max_entries =
271 std::env::var("CODEX_HELPER_SESSION_ROUTE_AFFINITY_MAX_ENTRIES")
272 .ok()
273 .and_then(|s| s.trim().parse::<usize>().ok())
274 .unwrap_or(5_000);
275
276 let cwd_cache_ttl_secs = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_TTL_SECS")
277 .ok()
278 .and_then(|s| s.trim().parse::<u64>().ok())
279 .unwrap_or(12 * 60 * 60);
280 let cwd_cache_ttl_ms = cwd_cache_ttl_secs.saturating_mul(1000);
281 let cwd_cache_max_entries = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_MAX_ENTRIES")
282 .ok()
283 .and_then(|s| s.trim().parse::<usize>().ok())
284 .unwrap_or(2_000);
285 let transcript_path_cache_ttl_secs =
286 std::env::var("CODEX_HELPER_SESSION_TRANSCRIPT_PATH_CACHE_TTL_SECS")
287 .ok()
288 .and_then(|s| s.trim().parse::<u64>().ok())
289 .unwrap_or(30);
290 let transcript_path_cache_ttl_ms = transcript_path_cache_ttl_secs.saturating_mul(1000);
291 let transcript_path_cache_max_entries =
292 std::env::var("CODEX_HELPER_SESSION_TRANSCRIPT_PATH_CACHE_MAX_ENTRIES")
293 .ok()
294 .and_then(|s| s.trim().parse::<usize>().ok())
295 .unwrap_or(5_000);
296
297 Self::new_with_runtime_policy(
298 lb_states,
299 RuntimePolicy {
300 session_override_ttl_ms: ttl_ms,
301 session_binding_ttl_ms: binding_ttl_ms,
302 session_binding_max_entries: binding_max_entries,
303 session_route_affinity_ttl_ms: route_affinity_ttl_ms,
304 session_route_affinity_max_entries: route_affinity_max_entries,
305 session_cwd_cache_ttl_ms: cwd_cache_ttl_ms,
306 session_cwd_cache_max_entries: cwd_cache_max_entries,
307 session_transcript_path_cache_ttl_ms: transcript_path_cache_ttl_ms,
308 session_transcript_path_cache_max_entries: transcript_path_cache_max_entries,
309 },
310 )
311 }
312
313 fn new_with_runtime_policy(
314 lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
315 policy: RuntimePolicy,
316 ) -> Arc<Self> {
317 Arc::new(Self {
318 next_request_id: AtomicU64::new(1),
319 session_override_ttl_ms: policy.session_override_ttl_ms,
320 session_binding_ttl_ms: policy.session_binding_ttl_ms,
321 session_binding_max_entries: policy.session_binding_max_entries,
322 session_route_affinity_ttl_ms: policy.session_route_affinity_ttl_ms,
323 session_route_affinity_max_entries: policy.session_route_affinity_max_entries,
324 session_cwd_cache_ttl_ms: policy.session_cwd_cache_ttl_ms,
325 session_cwd_cache_max_entries: policy.session_cwd_cache_max_entries,
326 session_transcript_path_cache_ttl_ms: policy.session_transcript_path_cache_ttl_ms,
327 session_transcript_path_cache_max_entries: policy
328 .session_transcript_path_cache_max_entries,
329 session_effort_overrides: RwLock::new(HashMap::new()),
330 session_station_overrides: RwLock::new(HashMap::new()),
331 session_route_target_overrides: RwLock::new(HashMap::new()),
332 session_model_overrides: RwLock::new(HashMap::new()),
333 session_service_tier_overrides: RwLock::new(HashMap::new()),
334 session_bindings: RwLock::new(HashMap::new()),
335 session_route_affinities: RwLock::new(HashMap::new()),
336 global_station_override: RwLock::new(None),
337 global_route_target_override: RwLock::new(None),
338 runtime_default_profiles: RwLock::new(HashMap::new()),
339 station_meta_overrides: RwLock::new(HashMap::new()),
340 provider_endpoint_meta_overrides: RwLock::new(HashMap::new()),
341 upstream_meta_overrides: RwLock::new(HashMap::new()),
342 session_cwd_cache: RwLock::new(HashMap::new()),
343 session_transcript_path_cache: RwLock::new(HashMap::new()),
344 session_stats: RwLock::new(HashMap::new()),
345 active_requests: RwLock::new(HashMap::new()),
346 recent_finished: RwLock::new(VecDeque::new()),
347 usage_rollups: RwLock::new(HashMap::new()),
348 station_health: RwLock::new(HashMap::new()),
349 passive_station_health: RwLock::new(HashMap::new()),
350 provider_balances: RwLock::new(HashMap::new()),
351 provider_balance_summaries: RwLock::new(HashMap::new()),
352 provider_endpoint_runtime_health: RwLock::new(HashMap::new()),
353 station_health_checks: RwLock::new(HashMap::new()),
354 service_layout_signatures: RwLock::new(HashMap::new()),
355 lb_states,
356 })
357 }
358
359 pub async fn get_session_effort_override(&self, session_id: &str) -> Option<String> {
360 let guard = self.session_effort_overrides.read().await;
361 guard.get(session_id).map(|v| v.effort.clone())
362 }
363
364 pub async fn get_session_reasoning_effort_override(&self, session_id: &str) -> Option<String> {
365 self.get_session_effort_override(session_id).await
366 }
367
368 pub async fn set_session_effort_override(
369 &self,
370 session_id: String,
371 effort: String,
372 now_ms: u64,
373 ) {
374 let mut guard = self.session_effort_overrides.write().await;
375 guard.insert(
376 session_id,
377 SessionEffortOverride {
378 effort,
379 updated_at_ms: now_ms,
380 last_seen_ms: now_ms,
381 },
382 );
383 }
384
385 pub async fn set_session_reasoning_effort_override(
386 &self,
387 session_id: String,
388 reasoning_effort: String,
389 now_ms: u64,
390 ) {
391 self.set_session_effort_override(session_id, reasoning_effort, now_ms)
392 .await;
393 }
394
395 pub async fn clear_session_effort_override(&self, session_id: &str) {
396 let mut guard = self.session_effort_overrides.write().await;
397 guard.remove(session_id);
398 }
399
400 pub async fn clear_session_reasoning_effort_override(&self, session_id: &str) {
401 self.clear_session_effort_override(session_id).await;
402 }
403
404 pub async fn list_session_effort_overrides(&self) -> HashMap<String, String> {
405 let guard = self.session_effort_overrides.read().await;
406 guard
407 .iter()
408 .map(|(k, v)| (k.clone(), v.effort.clone()))
409 .collect()
410 }
411
412 pub async fn list_session_reasoning_effort_overrides(&self) -> HashMap<String, String> {
413 self.list_session_effort_overrides().await
414 }
415
416 pub async fn touch_session_override(&self, session_id: &str, now_ms: u64) {
417 let mut guard = self.session_effort_overrides.write().await;
418 if let Some(v) = guard.get_mut(session_id) {
419 v.last_seen_ms = now_ms;
420 }
421 }
422
423 pub async fn touch_session_reasoning_effort_override(&self, session_id: &str, now_ms: u64) {
424 self.touch_session_override(session_id, now_ms).await;
425 }
426
427 pub async fn get_session_station_override(&self, session_id: &str) -> Option<String> {
428 let guard = self.session_station_overrides.read().await;
429 guard.get(session_id).map(|v| v.station_name.clone())
430 }
431
432 pub async fn get_session_route_target_override(&self, session_id: &str) -> Option<String> {
433 let guard = self.session_route_target_overrides.read().await;
434 guard.get(session_id).map(|v| v.target.clone())
435 }
436
437 pub async fn get_session_model_override(&self, session_id: &str) -> Option<String> {
438 let guard = self.session_model_overrides.read().await;
439 guard.get(session_id).map(|v| v.model.clone())
440 }
441
442 pub async fn set_session_model_override(&self, session_id: String, model: String, now_ms: u64) {
443 let mut guard = self.session_model_overrides.write().await;
444 guard.insert(
445 session_id,
446 SessionModelOverride {
447 model,
448 updated_at_ms: now_ms,
449 last_seen_ms: now_ms,
450 },
451 );
452 }
453
454 pub async fn clear_session_model_override(&self, session_id: &str) {
455 let mut guard = self.session_model_overrides.write().await;
456 guard.remove(session_id);
457 }
458
459 pub async fn list_session_model_overrides(&self) -> HashMap<String, String> {
460 let guard = self.session_model_overrides.read().await;
461 guard
462 .iter()
463 .map(|(k, v)| (k.clone(), v.model.clone()))
464 .collect()
465 }
466
467 pub async fn touch_session_model_override(&self, session_id: &str, now_ms: u64) {
468 let mut guard = self.session_model_overrides.write().await;
469 if let Some(v) = guard.get_mut(session_id) {
470 v.last_seen_ms = now_ms;
471 }
472 }
473
474 pub async fn get_session_service_tier_override(&self, session_id: &str) -> Option<String> {
475 let guard = self.session_service_tier_overrides.read().await;
476 guard.get(session_id).map(|v| v.service_tier.clone())
477 }
478
479 pub async fn set_session_service_tier_override(
480 &self,
481 session_id: String,
482 service_tier: String,
483 now_ms: u64,
484 ) {
485 let mut guard = self.session_service_tier_overrides.write().await;
486 guard.insert(
487 session_id,
488 SessionServiceTierOverride {
489 service_tier,
490 updated_at_ms: now_ms,
491 last_seen_ms: now_ms,
492 },
493 );
494 }
495
496 pub async fn clear_session_service_tier_override(&self, session_id: &str) {
497 let mut guard = self.session_service_tier_overrides.write().await;
498 guard.remove(session_id);
499 }
500
501 pub async fn list_session_service_tier_overrides(&self) -> HashMap<String, String> {
502 let guard = self.session_service_tier_overrides.read().await;
503 guard
504 .iter()
505 .map(|(k, v)| (k.clone(), v.service_tier.clone()))
506 .collect()
507 }
508
509 pub async fn touch_session_service_tier_override(&self, session_id: &str, now_ms: u64) {
510 let mut guard = self.session_service_tier_overrides.write().await;
511 if let Some(v) = guard.get_mut(session_id) {
512 v.last_seen_ms = now_ms;
513 }
514 }
515
516 pub async fn get_session_binding(&self, session_id: &str) -> Option<SessionBinding> {
517 let guard = self.session_bindings.read().await;
518 guard.get(session_id).map(|entry| entry.binding.clone())
519 }
520
521 pub async fn list_session_bindings(&self) -> HashMap<String, SessionBinding> {
522 let guard = self.session_bindings.read().await;
523 guard
524 .iter()
525 .map(|(sid, entry)| (sid.clone(), entry.binding.clone()))
526 .collect()
527 }
528
529 pub async fn get_session_route_affinity(
530 &self,
531 session_id: &str,
532 ) -> Option<SessionRouteAffinity> {
533 let mut guard = self.session_route_affinities.write().await;
534 let affinity = guard.get(session_id).cloned()?;
535 if self.session_route_affinity_is_expired(&affinity, unix_now_ms()) {
536 guard.remove(session_id);
537 return None;
538 }
539 Some(affinity)
540 }
541
542 pub async fn list_session_route_affinities(&self) -> HashMap<String, SessionRouteAffinity> {
543 let mut guard = self.session_route_affinities.write().await;
544 let now_ms = unix_now_ms();
545 guard.retain(|_, affinity| !self.session_route_affinity_is_expired(affinity, now_ms));
546 guard.clone()
547 }
548
549 pub async fn record_session_route_affinity_success(
550 &self,
551 session_id: &str,
552 target: SessionRouteAffinityTarget,
553 reason_hint: Option<String>,
554 now_ms: u64,
555 ) -> SessionRouteAffinity {
556 let mut guard = self.session_route_affinities.write().await;
557 let reason = match guard.get_mut(session_id) {
558 Some(existing) if target.same_target(existing) => {
559 existing.last_selected_at_ms = now_ms;
560 return existing.clone();
561 }
562 Some(_) => reason_hint.unwrap_or_else(|| "target_changed".to_string()),
563 None => reason_hint.unwrap_or_else(|| "first_success".to_string()),
564 };
565
566 let affinity = SessionRouteAffinity {
567 route_graph_key: target.route_graph_key,
568 provider_endpoint: target.provider_endpoint,
569 upstream_base_url: target.upstream_base_url,
570 route_path: target.route_path,
571 last_selected_at_ms: now_ms,
572 last_changed_at_ms: now_ms,
573 change_reason: reason,
574 };
575 guard.insert(session_id.to_string(), affinity.clone());
576 prune_lru_cache(
577 &mut guard,
578 self.session_route_affinity_max_entries,
579 |entry| entry.last_selected_at_ms,
580 );
581 affinity
582 }
583
584 fn session_route_affinity_is_expired(
585 &self,
586 affinity: &SessionRouteAffinity,
587 now_ms: u64,
588 ) -> bool {
589 self.session_route_affinity_ttl_ms > 0
590 && now_ms.saturating_sub(affinity.last_selected_at_ms)
591 >= self.session_route_affinity_ttl_ms
592 }
593
594 pub async fn set_session_binding(&self, binding: SessionBinding) {
595 let mut guard = self.session_bindings.write().await;
596 let binding = if let Some(existing) = guard.get(binding.session_id.as_str()) {
597 SessionBinding {
598 created_at_ms: existing.binding.created_at_ms,
599 ..binding
600 }
601 } else {
602 binding
603 };
604 guard.insert(binding.session_id.clone(), SessionBindingEntry { binding });
605 }
606
607 pub async fn clear_session_binding(&self, session_id: &str) {
608 let mut guard = self.session_bindings.write().await;
609 guard.remove(session_id);
610 }
611
612 pub async fn clear_session_manual_overrides(&self, session_id: &str) {
613 self.clear_session_station_override(session_id).await;
614 self.clear_session_route_target_override(session_id).await;
615 self.clear_session_model_override(session_id).await;
616 self.clear_session_effort_override(session_id).await;
617 self.clear_session_service_tier_override(session_id).await;
618 }
619
620 pub async fn get_session_manual_overrides(&self, session_id: &str) -> SessionManualOverrides {
621 let (reasoning_effort, station_name, route_target, model, service_tier) = tokio::join!(
622 self.get_session_reasoning_effort_override(session_id),
623 self.get_session_station_override(session_id),
624 self.get_session_route_target_override(session_id),
625 self.get_session_model_override(session_id),
626 self.get_session_service_tier_override(session_id),
627 );
628
629 SessionManualOverrides {
630 reasoning_effort,
631 station_name,
632 route_target,
633 model,
634 service_tier,
635 }
636 }
637
638 pub async fn list_session_manual_overrides(&self) -> HashMap<String, SessionManualOverrides> {
639 let (reasoning_effort_map, station_map, route_target_map, model_map, service_tier_map) = tokio::join!(
640 self.list_session_reasoning_effort_overrides(),
641 self.list_session_station_overrides(),
642 self.list_session_route_target_overrides(),
643 self.list_session_model_overrides(),
644 self.list_session_service_tier_overrides(),
645 );
646
647 let mut merged = HashMap::<String, SessionManualOverrides>::new();
648 for (session_id, reasoning_effort) in reasoning_effort_map {
649 merged.entry(session_id).or_default().reasoning_effort = Some(reasoning_effort);
650 }
651 for (session_id, station_name) in station_map {
652 merged.entry(session_id).or_default().station_name = Some(station_name);
653 }
654 for (session_id, route_target) in route_target_map {
655 merged.entry(session_id).or_default().route_target = Some(route_target);
656 }
657 for (session_id, model) in model_map {
658 merged.entry(session_id).or_default().model = Some(model);
659 }
660 for (session_id, service_tier) in service_tier_map {
661 merged.entry(session_id).or_default().service_tier = Some(service_tier);
662 }
663 merged.retain(|_, overrides| !overrides.is_empty());
664 merged
665 }
666
667 pub async fn apply_session_profile_binding(
668 &self,
669 service_name: &str,
670 mgr: &ServiceConfigManager,
671 session_id: String,
672 profile_name: String,
673 now_ms: u64,
674 ) -> anyhow::Result<()> {
675 let profile = crate::config::resolve_service_profile(mgr, profile_name.as_str())?;
676 crate::config::validate_profile_station_compatibility(
677 service_name,
678 mgr,
679 profile_name.as_str(),
680 &profile,
681 )?;
682
683 self.set_session_binding(SessionBinding {
684 session_id: session_id.clone(),
685 profile_name: Some(profile_name),
686 station_name: profile.station.clone(),
687 model: profile.model.clone(),
688 reasoning_effort: profile.reasoning_effort.clone(),
689 service_tier: profile.service_tier.clone(),
690 continuity_mode: SessionContinuityMode::ManualProfile,
691 created_at_ms: now_ms,
692 updated_at_ms: now_ms,
693 last_seen_ms: now_ms,
694 })
695 .await;
696 self.clear_session_manual_overrides(session_id.as_str())
697 .await;
698 Ok(())
699 }
700
701 pub async fn touch_session_binding(&self, session_id: &str, now_ms: u64) {
702 let mut guard = self.session_bindings.write().await;
703 if let Some(entry) = guard.get_mut(session_id) {
704 entry.binding.last_seen_ms = now_ms;
705 }
706 }
707
708 pub async fn set_session_station_override(
709 &self,
710 session_id: String,
711 station_name: String,
712 now_ms: u64,
713 ) {
714 let mut guard = self.session_station_overrides.write().await;
715 guard.insert(
716 session_id,
717 SessionStationOverride {
718 station_name,
719 updated_at_ms: now_ms,
720 last_seen_ms: now_ms,
721 },
722 );
723 }
724
725 pub async fn set_session_route_target_override(
726 &self,
727 session_id: String,
728 target: String,
729 now_ms: u64,
730 ) {
731 let mut guard = self.session_route_target_overrides.write().await;
732 guard.insert(
733 session_id,
734 SessionRouteTargetOverride {
735 target,
736 updated_at_ms: now_ms,
737 last_seen_ms: now_ms,
738 },
739 );
740 }
741
742 pub async fn clear_session_station_override(&self, session_id: &str) {
743 let mut guard = self.session_station_overrides.write().await;
744 guard.remove(session_id);
745 }
746
747 pub async fn clear_session_route_target_override(&self, session_id: &str) {
748 let mut guard = self.session_route_target_overrides.write().await;
749 guard.remove(session_id);
750 }
751
752 pub async fn list_session_station_overrides(&self) -> HashMap<String, String> {
753 let guard = self.session_station_overrides.read().await;
754 guard
755 .iter()
756 .map(|(k, v)| (k.clone(), v.station_name.clone()))
757 .collect()
758 }
759
760 pub async fn list_session_route_target_overrides(&self) -> HashMap<String, String> {
761 let guard = self.session_route_target_overrides.read().await;
762 guard
763 .iter()
764 .map(|(k, v)| (k.clone(), v.target.clone()))
765 .collect()
766 }
767
768 pub async fn touch_session_station_override(&self, session_id: &str, now_ms: u64) {
769 let mut guard = self.session_station_overrides.write().await;
770 if let Some(v) = guard.get_mut(session_id) {
771 v.last_seen_ms = now_ms;
772 }
773 }
774
775 pub async fn touch_session_route_target_override(&self, session_id: &str, now_ms: u64) {
776 let mut guard = self.session_route_target_overrides.write().await;
777 if let Some(v) = guard.get_mut(session_id) {
778 v.last_seen_ms = now_ms;
779 }
780 }
781
782 pub async fn get_global_station_override(&self) -> Option<String> {
783 let guard = self.global_station_override.read().await;
784 guard.clone()
785 }
786
787 pub async fn get_global_route_target_override(&self) -> Option<String> {
788 let guard = self.global_route_target_override.read().await;
789 guard.clone()
790 }
791
792 pub async fn set_global_station_override(&self, station_name: String, _now_ms: u64) {
793 let mut guard = self.global_station_override.write().await;
794 *guard = Some(station_name);
795 }
796
797 pub async fn set_global_route_target_override(&self, target: String, _now_ms: u64) {
798 let mut guard = self.global_route_target_override.write().await;
799 *guard = Some(target);
800 }
801
802 pub async fn clear_global_station_override(&self) {
803 let mut guard = self.global_station_override.write().await;
804 *guard = None;
805 }
806
807 pub async fn clear_global_route_target_override(&self) {
808 let mut guard = self.global_route_target_override.write().await;
809 *guard = None;
810 }
811
812 pub async fn get_runtime_default_profile_override(&self, service_name: &str) -> Option<String> {
813 let guard = self.runtime_default_profiles.read().await;
814 guard
815 .get(service_name)
816 .map(|entry| entry.profile_name.clone())
817 }
818
819 pub async fn set_runtime_default_profile_override(
820 &self,
821 service_name: String,
822 profile_name: String,
823 now_ms: u64,
824 ) {
825 let mut guard = self.runtime_default_profiles.write().await;
826 guard.insert(
827 service_name,
828 RuntimeDefaultProfileOverride {
829 profile_name,
830 updated_at_ms: now_ms,
831 },
832 );
833 }
834
835 pub async fn clear_runtime_default_profile_override(&self, service_name: &str) {
836 let mut guard = self.runtime_default_profiles.write().await;
837 guard.remove(service_name);
838 }
839
840 pub async fn set_station_enabled_override(
841 &self,
842 service_name: &str,
843 station_name: String,
844 enabled: bool,
845 now_ms: u64,
846 ) {
847 let mut guard = self.station_meta_overrides.write().await;
848 let per_service = guard.entry(service_name.to_string()).or_default();
849 let entry = per_service.entry(station_name).or_default();
850 entry.enabled = Some(enabled);
851 entry.updated_at_ms = now_ms;
852 }
853
854 pub async fn set_station_level_override(
855 &self,
856 service_name: &str,
857 station_name: String,
858 level: u8,
859 now_ms: u64,
860 ) {
861 let mut guard = self.station_meta_overrides.write().await;
862 let per_service = guard.entry(service_name.to_string()).or_default();
863 let entry = per_service.entry(station_name).or_default();
864 entry.level = Some(level.clamp(1, 10));
865 entry.updated_at_ms = now_ms;
866 }
867
868 pub async fn set_station_runtime_state_override(
869 &self,
870 service_name: &str,
871 station_name: String,
872 state: RuntimeConfigState,
873 now_ms: u64,
874 ) {
875 let mut guard = self.station_meta_overrides.write().await;
876 let per_service = guard.entry(service_name.to_string()).or_default();
877 let entry = per_service.entry(station_name).or_default();
878 entry.state = Some(state);
879 entry.updated_at_ms = now_ms;
880 }
881
882 pub async fn clear_station_enabled_override(&self, service_name: &str, station_name: &str) {
883 let mut guard = self.station_meta_overrides.write().await;
884 let Some(per_service) = guard.get_mut(service_name) else {
885 return;
886 };
887 let Some(entry) = per_service.get_mut(station_name) else {
888 return;
889 };
890 entry.enabled = None;
891 if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
892 per_service.remove(station_name);
893 }
894 if per_service.is_empty() {
895 guard.remove(service_name);
896 }
897 }
898
899 pub async fn clear_station_level_override(&self, service_name: &str, station_name: &str) {
900 let mut guard = self.station_meta_overrides.write().await;
901 let Some(per_service) = guard.get_mut(service_name) else {
902 return;
903 };
904 let Some(entry) = per_service.get_mut(station_name) else {
905 return;
906 };
907 entry.level = None;
908 if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
909 per_service.remove(station_name);
910 }
911 if per_service.is_empty() {
912 guard.remove(service_name);
913 }
914 }
915
916 pub async fn clear_station_runtime_state_override(
917 &self,
918 service_name: &str,
919 station_name: &str,
920 ) {
921 let mut guard = self.station_meta_overrides.write().await;
922 let Some(per_service) = guard.get_mut(service_name) else {
923 return;
924 };
925 let Some(entry) = per_service.get_mut(station_name) else {
926 return;
927 };
928 entry.state = None;
929 if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
930 per_service.remove(station_name);
931 }
932 if per_service.is_empty() {
933 guard.remove(service_name);
934 }
935 }
936
937 pub async fn get_station_meta_overrides(
938 &self,
939 service_name: &str,
940 ) -> HashMap<String, (Option<bool>, Option<u8>)> {
941 let guard = self.station_meta_overrides.read().await;
942 guard
943 .get(service_name)
944 .map(|m| {
945 m.iter()
946 .map(|(k, v)| (k.clone(), (v.enabled, v.level)))
947 .collect::<HashMap<_, _>>()
948 })
949 .unwrap_or_default()
950 }
951
952 pub async fn get_station_runtime_state_overrides(
953 &self,
954 service_name: &str,
955 ) -> HashMap<String, RuntimeConfigState> {
956 let guard = self.station_meta_overrides.read().await;
957 guard
958 .get(service_name)
959 .map(|m| {
960 m.iter()
961 .filter_map(|(k, v)| v.state.map(|state| (k.clone(), state)))
962 .collect::<HashMap<_, _>>()
963 })
964 .unwrap_or_default()
965 }
966
967 pub async fn set_provider_endpoint_enabled_override(
968 &self,
969 service_name: &str,
970 endpoint_key: ProviderEndpointKey,
971 enabled: bool,
972 now_ms: u64,
973 ) {
974 let mut guard = self.provider_endpoint_meta_overrides.write().await;
975 let per_service = guard.entry(service_name.to_string()).or_default();
976 let entry = per_service.entry(endpoint_key).or_default();
977 entry.enabled = Some(enabled);
978 entry.updated_at_ms = now_ms;
979 }
980
981 pub async fn clear_provider_endpoint_enabled_override(
982 &self,
983 service_name: &str,
984 endpoint_key: &ProviderEndpointKey,
985 ) {
986 let mut guard = self.provider_endpoint_meta_overrides.write().await;
987 let Some(per_service) = guard.get_mut(service_name) else {
988 return;
989 };
990 let Some(entry) = per_service.get_mut(endpoint_key) else {
991 return;
992 };
993 entry.enabled = None;
994 if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
995 per_service.remove(endpoint_key);
996 }
997 if per_service.is_empty() {
998 guard.remove(service_name);
999 }
1000 }
1001
1002 pub async fn set_provider_endpoint_runtime_state_override(
1003 &self,
1004 service_name: &str,
1005 endpoint_key: ProviderEndpointKey,
1006 state: RuntimeConfigState,
1007 now_ms: u64,
1008 ) {
1009 let mut guard = self.provider_endpoint_meta_overrides.write().await;
1010 let per_service = guard.entry(service_name.to_string()).or_default();
1011 let entry = per_service.entry(endpoint_key).or_default();
1012 entry.state = Some(state);
1013 entry.updated_at_ms = now_ms;
1014 }
1015
1016 pub async fn clear_provider_endpoint_runtime_state_override(
1017 &self,
1018 service_name: &str,
1019 endpoint_key: &ProviderEndpointKey,
1020 ) {
1021 let mut guard = self.provider_endpoint_meta_overrides.write().await;
1022 let Some(per_service) = guard.get_mut(service_name) else {
1023 return;
1024 };
1025 let Some(entry) = per_service.get_mut(endpoint_key) else {
1026 return;
1027 };
1028 entry.state = None;
1029 if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
1030 per_service.remove(endpoint_key);
1031 }
1032 if per_service.is_empty() {
1033 guard.remove(service_name);
1034 }
1035 }
1036
1037 pub async fn set_upstream_enabled_override(
1038 &self,
1039 service_name: &str,
1040 base_url: String,
1041 enabled: bool,
1042 now_ms: u64,
1043 ) {
1044 let mut guard = self.upstream_meta_overrides.write().await;
1045 let per_service = guard.entry(service_name.to_string()).or_default();
1046 let entry = per_service.entry(base_url).or_default();
1047 entry.enabled = Some(enabled);
1048 entry.updated_at_ms = now_ms;
1049 }
1050
1051 pub async fn clear_upstream_enabled_override(&self, service_name: &str, base_url: &str) {
1052 let mut guard = self.upstream_meta_overrides.write().await;
1053 let Some(per_service) = guard.get_mut(service_name) else {
1054 return;
1055 };
1056 let Some(entry) = per_service.get_mut(base_url) else {
1057 return;
1058 };
1059 entry.enabled = None;
1060 if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
1061 per_service.remove(base_url);
1062 }
1063 if per_service.is_empty() {
1064 guard.remove(service_name);
1065 }
1066 }
1067
1068 pub async fn set_upstream_runtime_state_override(
1069 &self,
1070 service_name: &str,
1071 base_url: String,
1072 state: RuntimeConfigState,
1073 now_ms: u64,
1074 ) {
1075 let mut guard = self.upstream_meta_overrides.write().await;
1076 let per_service = guard.entry(service_name.to_string()).or_default();
1077 let entry = per_service.entry(base_url).or_default();
1078 entry.state = Some(state);
1079 entry.updated_at_ms = now_ms;
1080 }
1081
1082 pub async fn clear_upstream_runtime_state_override(&self, service_name: &str, base_url: &str) {
1083 let mut guard = self.upstream_meta_overrides.write().await;
1084 let Some(per_service) = guard.get_mut(service_name) else {
1085 return;
1086 };
1087 let Some(entry) = per_service.get_mut(base_url) else {
1088 return;
1089 };
1090 entry.state = None;
1091 if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
1092 per_service.remove(base_url);
1093 }
1094 if per_service.is_empty() {
1095 guard.remove(service_name);
1096 }
1097 }
1098
1099 pub async fn get_upstream_meta_overrides(
1100 &self,
1101 service_name: &str,
1102 ) -> HashMap<String, (Option<bool>, Option<RuntimeConfigState>)> {
1103 let mut overrides = HashMap::new();
1104
1105 {
1106 let guard = self.upstream_meta_overrides.read().await;
1107 if let Some(per_service) = guard.get(service_name) {
1108 overrides.extend(
1109 per_service
1110 .iter()
1111 .map(|(k, v)| (k.clone(), (v.enabled, v.state))),
1112 );
1113 }
1114 }
1115
1116 {
1117 let guard = self.provider_endpoint_meta_overrides.read().await;
1118 if let Some(per_service) = guard.get(service_name) {
1119 overrides.extend(
1120 per_service
1121 .iter()
1122 .map(|(k, v)| (k.stable_key(), (v.enabled, v.state))),
1123 );
1124 }
1125 }
1126
1127 overrides
1128 }
1129
1130 pub async fn route_plan_runtime_state_for_provider_endpoints(
1131 &self,
1132 service_name: &str,
1133 ) -> RoutePlanRuntimeState {
1134 let mut runtime = RoutePlanRuntimeState::default();
1135 let now = std::time::Instant::now();
1136
1137 {
1138 let mut guard = self.provider_endpoint_runtime_health.write().await;
1139 if let Some(per_service) = guard.get_mut(service_name) {
1140 let mut affinity: Option<(ProviderEndpointKey, u64)> = None;
1141 for (endpoint_key, health) in per_service.iter_mut() {
1142 if health.cooldown_until.is_some_and(|until| now >= until) {
1143 health.failure_count = 0;
1144 health.cooldown_until = None;
1145 }
1146 let cooldown_active = health.cooldown_until.is_some_and(|until| now < until);
1147 runtime.set_provider_endpoint(
1148 endpoint_key.clone(),
1149 RoutePlanUpstreamRuntimeState {
1150 runtime_disabled: false,
1151 failure_count: health.failure_count,
1152 cooldown_active,
1153 usage_exhausted: health.usage_exhausted,
1154 missing_auth: false,
1155 },
1156 );
1157 if let Some(last_good_at_ms) = health.last_good_at_ms
1158 && affinity
1159 .as_ref()
1160 .is_none_or(|(_, current)| last_good_at_ms > *current)
1161 {
1162 affinity = Some((endpoint_key.clone(), last_good_at_ms));
1163 }
1164 }
1165 if let Some((endpoint_key, _)) = affinity {
1166 runtime.set_affinity_provider_endpoint(Some(endpoint_key));
1167 }
1168 }
1169 }
1170
1171 {
1172 let guard = self.provider_endpoint_meta_overrides.read().await;
1173 if let Some(per_service) = guard.get(service_name) {
1174 for (endpoint_key, meta) in per_service {
1175 let mut upstream_state = runtime.provider_endpoint(endpoint_key);
1176 if meta.enabled == Some(false)
1177 || meta
1178 .state
1179 .is_some_and(|state| state != RuntimeConfigState::Normal)
1180 {
1181 upstream_state.runtime_disabled = true;
1182 }
1183 runtime.set_provider_endpoint(endpoint_key.clone(), upstream_state);
1184 }
1185 }
1186 }
1187
1188 runtime
1189 }
1190
1191 pub async fn record_provider_endpoint_attempt_success(
1192 &self,
1193 service_name: &str,
1194 endpoint_key: ProviderEndpointKey,
1195 now_ms: u64,
1196 ) {
1197 let mut guard = self.provider_endpoint_runtime_health.write().await;
1198 let entry = guard
1199 .entry(service_name.to_string())
1200 .or_default()
1201 .entry(endpoint_key)
1202 .or_default();
1203 entry.failure_count = 0;
1204 entry.cooldown_until = None;
1205 entry.penalty_streak = 0;
1206 entry.last_good_at_ms = Some(now_ms);
1207 }
1208
1209 pub async fn record_provider_endpoint_attempt_failure(
1210 &self,
1211 service_name: &str,
1212 endpoint_key: ProviderEndpointKey,
1213 failure_threshold_cooldown_secs: u64,
1214 cooldown_backoff: CooldownBackoff,
1215 ) {
1216 let mut guard = self.provider_endpoint_runtime_health.write().await;
1217 let entry = guard
1218 .entry(service_name.to_string())
1219 .or_default()
1220 .entry(endpoint_key)
1221 .or_default();
1222
1223 entry.failure_count = entry.failure_count.saturating_add(1);
1224 if entry.failure_count >= FAILURE_THRESHOLD {
1225 let base_secs = if failure_threshold_cooldown_secs == 0 {
1226 COOLDOWN_SECS
1227 } else {
1228 failure_threshold_cooldown_secs
1229 };
1230 let effective_secs =
1231 cooldown_backoff.effective_cooldown_secs(base_secs, entry.penalty_streak);
1232 let now = std::time::Instant::now();
1233 let new_until = now + std::time::Duration::from_secs(effective_secs);
1234 if entry
1235 .cooldown_until
1236 .is_none_or(|existing| new_until > existing)
1237 {
1238 entry.cooldown_until = Some(new_until);
1239 }
1240 entry.penalty_streak = entry.penalty_streak.saturating_add(1);
1241 entry.last_good_at_ms = None;
1242 }
1243 }
1244
1245 pub async fn penalize_provider_endpoint_attempt(
1246 &self,
1247 service_name: &str,
1248 endpoint_key: ProviderEndpointKey,
1249 cooldown_secs: u64,
1250 cooldown_backoff: CooldownBackoff,
1251 ) {
1252 let mut guard = self.provider_endpoint_runtime_health.write().await;
1253 let entry = guard
1254 .entry(service_name.to_string())
1255 .or_default()
1256 .entry(endpoint_key)
1257 .or_default();
1258 let effective_secs =
1259 cooldown_backoff.effective_cooldown_secs(cooldown_secs, entry.penalty_streak);
1260 entry.failure_count = FAILURE_THRESHOLD;
1261 entry.cooldown_until =
1262 Some(std::time::Instant::now() + std::time::Duration::from_secs(effective_secs));
1263 entry.penalty_streak = entry.penalty_streak.saturating_add(1);
1264 entry.last_good_at_ms = None;
1265 }
1266
1267 pub async fn set_provider_endpoint_usage_exhausted(
1268 &self,
1269 service_name: &str,
1270 endpoint_key: ProviderEndpointKey,
1271 exhausted: bool,
1272 ) {
1273 let mut guard = self.provider_endpoint_runtime_health.write().await;
1274 let entry = guard
1275 .entry(service_name.to_string())
1276 .or_default()
1277 .entry(endpoint_key)
1278 .or_default();
1279 entry.usage_exhausted = exhausted;
1280 }
1281
1282 pub async fn prune_runtime_observability_for_service(
1283 &self,
1284 service_name: &str,
1285 mgr: &ServiceConfigManager,
1286 ) {
1287 let active_stations = mgr.stations().keys().cloned().collect::<HashSet<_>>();
1288 let active_upstreams = mgr
1289 .stations()
1290 .iter()
1291 .map(|(station_name, service)| {
1292 (
1293 station_name.clone(),
1294 service
1295 .upstreams
1296 .iter()
1297 .map(|upstream| upstream.base_url.clone())
1298 .collect::<HashSet<_>>(),
1299 )
1300 })
1301 .collect::<HashMap<_, _>>();
1302 let active_base_urls = active_upstreams
1303 .values()
1304 .flat_map(|upstreams| upstreams.iter().cloned())
1305 .collect::<HashSet<_>>();
1306 let active_provider_endpoint_keys = mgr
1307 .stations()
1308 .iter()
1309 .flat_map(|(station_name, service)| {
1310 service.upstreams.iter().enumerate().map(|(idx, upstream)| {
1311 Self::active_provider_endpoint_key_for_upstream(
1312 service_name,
1313 station_name.as_str(),
1314 idx,
1315 upstream,
1316 )
1317 })
1318 })
1319 .collect::<HashSet<_>>();
1320 let mut active_provider_ids = HashSet::from(["-".to_string()]);
1321 for service in mgr.stations().values() {
1322 for upstream in &service.upstreams {
1323 if let Some(provider_id) = upstream.tags.get("provider_id") {
1324 active_provider_ids.insert(provider_id.clone());
1325 }
1326 }
1327 }
1328
1329 let layout = service_layout_signature(mgr);
1330 let balance_prune_stations = {
1331 let mut signatures = self.service_layout_signatures.write().await;
1332 let changed = signatures.get(service_name).map_or_else(
1333 || {
1334 if active_stations.len() == 1 && active_stations.contains("routing") {
1335 Some(HashSet::from(["routing".to_string()]))
1336 } else {
1337 None
1338 }
1339 },
1340 |previous| Some(changed_service_layout_stations(previous, &layout)),
1341 );
1342 signatures.insert(service_name.to_string(), layout);
1343 changed
1344 };
1345
1346 match balance_prune_stations {
1347 Some(changed_layout_stations) if !changed_layout_stations.is_empty() => {
1348 let mut provider_balances = self.provider_balances.write().await;
1349 if let Some(per_service) = provider_balances.get_mut(service_name) {
1350 per_service
1351 .retain(|station_name, _| !changed_layout_stations.contains(station_name));
1352 if per_service.is_empty() {
1353 provider_balances.remove(service_name);
1354 }
1355 }
1356 let mut provider_balance_summaries = self.provider_balance_summaries.write().await;
1357 if let Some(per_service) = provider_balance_summaries.get_mut(service_name) {
1358 per_service
1359 .retain(|station_name, _| !changed_layout_stations.contains(station_name));
1360 if per_service.is_empty() {
1361 provider_balance_summaries.remove(service_name);
1362 }
1363 }
1364 }
1365 None => {
1366 let mut provider_balances = self.provider_balances.write().await;
1367 provider_balances.remove(service_name);
1368 let mut provider_balance_summaries = self.provider_balance_summaries.write().await;
1369 provider_balance_summaries.remove(service_name);
1370 }
1371 Some(_) => {}
1372 }
1373
1374 {
1375 let mut guard = self.station_meta_overrides.write().await;
1376 if let Some(per_service) = guard.get_mut(service_name) {
1377 per_service.retain(|station_name, _| active_stations.contains(station_name));
1378 if per_service.is_empty() {
1379 guard.remove(service_name);
1380 }
1381 }
1382 }
1383
1384 {
1385 let mut guard = self.upstream_meta_overrides.write().await;
1386 if let Some(per_service) = guard.get_mut(service_name) {
1387 per_service.retain(|base_url, _| active_base_urls.contains(base_url));
1388 if per_service.is_empty() {
1389 guard.remove(service_name);
1390 }
1391 }
1392 }
1393
1394 {
1395 let mut guard = self.provider_endpoint_meta_overrides.write().await;
1396 if let Some(per_service) = guard.get_mut(service_name) {
1397 per_service
1398 .retain(|endpoint_key, _| active_provider_endpoint_keys.contains(endpoint_key));
1399 if per_service.is_empty() {
1400 guard.remove(service_name);
1401 }
1402 }
1403 }
1404
1405 {
1406 let mut guard = self.provider_endpoint_runtime_health.write().await;
1407 if let Some(per_service) = guard.get_mut(service_name) {
1408 per_service
1409 .retain(|endpoint_key, _| active_provider_endpoint_keys.contains(endpoint_key));
1410 if per_service.is_empty() {
1411 guard.remove(service_name);
1412 }
1413 }
1414 }
1415
1416 {
1417 let mut guard = self.station_health.write().await;
1418 if let Some(per_service) = guard.get_mut(service_name) {
1419 per_service.retain(|station_name, station_health| {
1420 if !active_stations.contains(station_name) {
1421 return false;
1422 }
1423 if let Some(allowed_upstreams) = active_upstreams.get(station_name) {
1424 station_health
1425 .upstreams
1426 .retain(|upstream| allowed_upstreams.contains(&upstream.base_url));
1427 }
1428 !station_health.upstreams.is_empty()
1429 });
1430 if per_service.is_empty() {
1431 guard.remove(service_name);
1432 }
1433 }
1434 }
1435
1436 {
1437 let mut guard = self.passive_station_health.write().await;
1438 if let Some(per_service) = guard.get_mut(service_name) {
1439 per_service.retain(|station_name, station_health| {
1440 if !active_stations.contains(station_name) {
1441 return false;
1442 }
1443 if let Some(allowed_upstreams) = active_upstreams.get(station_name) {
1444 station_health.retain(|base_url, _| allowed_upstreams.contains(base_url));
1445 }
1446 !station_health.is_empty()
1447 });
1448 if per_service.is_empty() {
1449 guard.remove(service_name);
1450 }
1451 }
1452 }
1453
1454 {
1455 let mut guard = self.station_health_checks.write().await;
1456 if let Some(per_service) = guard.get_mut(service_name) {
1457 per_service.retain(|station_name, _| active_stations.contains(station_name));
1458 if per_service.is_empty() {
1459 guard.remove(service_name);
1460 }
1461 }
1462 }
1463
1464 {
1465 let mut guard = self.usage_rollups.write().await;
1466 if let Some(rollup) = guard.get_mut(service_name) {
1467 rollup
1468 .by_config
1469 .retain(|station_name, _| active_stations.contains(station_name));
1470 rollup.by_config_day.retain(|station_name, _day_map| {
1471 if !active_stations.contains(station_name) {
1472 return false;
1473 }
1474 true
1475 });
1476 rollup
1477 .by_provider
1478 .retain(|provider_id, _| active_provider_ids.contains(provider_id));
1479 rollup.by_provider_day.retain(|provider_id, _day_map| {
1480 if !active_provider_ids.contains(provider_id) {
1481 return false;
1482 }
1483 true
1484 });
1485 }
1486 }
1487 }
1488
1489 fn active_provider_endpoint_key_for_upstream(
1490 service_name: &str,
1491 station_name: &str,
1492 upstream_index: usize,
1493 upstream: &crate::config::UpstreamConfig,
1494 ) -> ProviderEndpointKey {
1495 let provider_id = upstream
1496 .tags
1497 .get("provider_id")
1498 .cloned()
1499 .unwrap_or_else(|| format!("{station_name}#{upstream_index}"));
1500 let endpoint_id = upstream
1501 .tags
1502 .get("endpoint_id")
1503 .cloned()
1504 .unwrap_or_else(|| upstream_index.to_string());
1505
1506 ProviderEndpointKey::new(service_name, provider_id, endpoint_id)
1507 }
1508
1509 pub async fn record_station_health(
1510 &self,
1511 service_name: &str,
1512 station_name: String,
1513 health: StationHealth,
1514 ) {
1515 let mut guard = self.station_health.write().await;
1516 let per_service = guard.entry(service_name.to_string()).or_default();
1517 per_service.insert(station_name, health);
1518 }
1519
1520 pub async fn get_station_health(&self, service_name: &str) -> HashMap<String, StationHealth> {
1521 let active = {
1522 let guard = self.station_health.read().await;
1523 guard.get(service_name).cloned().unwrap_or_default()
1524 };
1525 let passive = {
1526 let guard = self.passive_station_health.read().await;
1527 guard.get(service_name).cloned().unwrap_or_default()
1528 };
1529 merge_station_health(active, passive)
1530 }
1531
1532 pub async fn record_provider_balance_snapshot(
1533 &self,
1534 service_name: &str,
1535 mut snapshot: ProviderBalanceSnapshot,
1536 ) {
1537 let (Some(station_name), Some(upstream_index)) =
1538 (snapshot.station_name.clone(), snapshot.upstream_index)
1539 else {
1540 return;
1541 };
1542 let now_ms = unix_now_ms();
1543 snapshot.refresh_status(now_ms);
1544
1545 let station_summary = {
1546 let mut guard = self.provider_balances.write().await;
1547 let station_balances = guard
1548 .entry(service_name.to_string())
1549 .or_default()
1550 .entry(station_name.clone())
1551 .or_default();
1552 station_balances
1553 .entry(upstream_index)
1554 .or_default()
1555 .insert(snapshot.provider_id.clone(), snapshot);
1556 StationRoutingBalanceSummary::from_snapshot_iter_at(
1557 station_balances
1558 .values()
1559 .flat_map(|providers| providers.values()),
1560 now_ms,
1561 )
1562 };
1563
1564 let mut summaries = self.provider_balance_summaries.write().await;
1565 summaries
1566 .entry(service_name.to_string())
1567 .or_default()
1568 .insert(station_name, station_summary);
1569 }
1570
1571 pub async fn get_provider_balance_view(
1572 &self,
1573 service_name: &str,
1574 ) -> HashMap<String, Vec<ProviderBalanceSnapshot>> {
1575 let now_ms = unix_now_ms();
1576 let guard = self.provider_balances.read().await;
1577 let Some(per_service) = guard.get(service_name) else {
1578 return HashMap::new();
1579 };
1580
1581 per_service
1582 .iter()
1583 .map(|(station_name, upstreams)| {
1584 let mut snapshots = upstreams
1585 .values()
1586 .flat_map(|providers| providers.values().cloned())
1587 .collect::<Vec<_>>();
1588 for snapshot in &mut snapshots {
1589 snapshot.refresh_status(now_ms);
1590 }
1591 snapshots.sort_by(|a, b| {
1592 a.upstream_index
1593 .cmp(&b.upstream_index)
1594 .then_with(|| a.provider_id.cmp(&b.provider_id))
1595 });
1596 (station_name.clone(), snapshots)
1597 })
1598 .collect()
1599 }
1600
1601 pub async fn get_provider_balance_summary_view(
1602 &self,
1603 service_name: &str,
1604 ) -> HashMap<String, StationRoutingBalanceSummary> {
1605 let guard = self.provider_balance_summaries.read().await;
1606 let Some(per_service) = guard.get(service_name) else {
1607 return HashMap::new();
1608 };
1609
1610 per_service.clone()
1611 }
1612
1613 pub async fn record_passive_upstream_success(
1614 &self,
1615 service_name: &str,
1616 station_name: &str,
1617 base_url: &str,
1618 status_code: Option<u16>,
1619 now_ms: u64,
1620 ) {
1621 let mut guard = self.passive_station_health.write().await;
1622 let entry = guard
1623 .entry(service_name.to_string())
1624 .or_default()
1625 .entry(station_name.to_string())
1626 .or_default()
1627 .entry(base_url.to_string())
1628 .or_default();
1629 entry.record_success(now_ms, status_code);
1630 }
1631
1632 pub async fn record_passive_upstream_failure(&self, params: PassiveUpstreamFailureRecord) {
1633 let PassiveUpstreamFailureRecord {
1634 service_name,
1635 station_name,
1636 base_url,
1637 status_code,
1638 error_class,
1639 error,
1640 now_ms,
1641 } = params;
1642
1643 let mut guard = self.passive_station_health.write().await;
1644 let entry = guard
1645 .entry(service_name)
1646 .or_default()
1647 .entry(station_name)
1648 .or_default()
1649 .entry(base_url)
1650 .or_default();
1651 entry.record_failure(now_ms, status_code, error_class, error);
1652 }
1653
1654 pub async fn get_lb_view(&self) -> HashMap<String, LbConfigView> {
1655 let Some(lb_states) = self.lb_states.as_ref() else {
1656 return HashMap::new();
1657 };
1658 let mut map = match lb_states.lock() {
1659 Ok(m) => m,
1660 Err(e) => e.into_inner(),
1661 };
1662
1663 let now = std::time::Instant::now();
1664 let mut out = HashMap::new();
1665 for (cfg_name, st) in map.iter_mut() {
1666 let len = st
1667 .failure_counts
1668 .len()
1669 .max(st.cooldown_until.len())
1670 .max(st.usage_exhausted.len());
1671 if len == 0 {
1672 continue;
1673 }
1674
1675 if st.failure_counts.len() != len {
1677 st.failure_counts.resize(len, 0);
1678 }
1679 if st.cooldown_until.len() != len {
1680 st.cooldown_until.resize(len, None);
1681 }
1682 if st.usage_exhausted.len() != len {
1683 st.usage_exhausted.resize(len, false);
1684 }
1685
1686 let mut upstreams = Vec::with_capacity(len);
1687 for idx in 0..len {
1688 let failure_count = st.failure_counts.get(idx).copied().unwrap_or(0);
1689 let cooldown_remaining_secs = st
1690 .cooldown_until
1691 .get(idx)
1692 .and_then(|v| *v)
1693 .map(|until| until.saturating_duration_since(now).as_secs())
1694 .filter(|&s| s > 0);
1695 let usage_exhausted = st.usage_exhausted.get(idx).copied().unwrap_or(false);
1696 upstreams.push(LbUpstreamView {
1697 failure_count,
1698 cooldown_remaining_secs,
1699 usage_exhausted,
1700 });
1701 }
1702
1703 out.insert(
1704 cfg_name.clone(),
1705 LbConfigView {
1706 last_good_index: st.last_good_index,
1707 upstreams,
1708 },
1709 );
1710 }
1711 out
1712 }
1713
1714 pub async fn list_health_checks(
1715 &self,
1716 service_name: &str,
1717 ) -> HashMap<String, HealthCheckStatus> {
1718 let guard = self.station_health_checks.read().await;
1719 guard.get(service_name).cloned().unwrap_or_default()
1720 }
1721
1722 pub async fn try_begin_station_health_check(
1723 &self,
1724 service_name: &str,
1725 station_name: &str,
1726 total: usize,
1727 now_ms: u64,
1728 ) -> bool {
1729 let mut guard = self.station_health_checks.write().await;
1730 let per_service = guard.entry(service_name.to_string()).or_default();
1731 if let Some(existing) = per_service.get(station_name)
1732 && !existing.done
1733 {
1734 return false;
1735 }
1736 per_service.insert(
1737 station_name.to_string(),
1738 HealthCheckStatus {
1739 started_at_ms: now_ms,
1740 updated_at_ms: now_ms,
1741 total: total.min(u32::MAX as usize) as u32,
1742 completed: 0,
1743 ok: 0,
1744 err: 0,
1745 cancel_requested: false,
1746 canceled: false,
1747 done: false,
1748 last_error: None,
1749 },
1750 );
1751 true
1752 }
1753
1754 pub async fn request_cancel_station_health_check(
1755 &self,
1756 service_name: &str,
1757 station_name: &str,
1758 now_ms: u64,
1759 ) -> bool {
1760 let mut guard = self.station_health_checks.write().await;
1761 let Some(per_service) = guard.get_mut(service_name) else {
1762 return false;
1763 };
1764 let Some(st) = per_service.get_mut(station_name) else {
1765 return false;
1766 };
1767 if st.done {
1768 return false;
1769 }
1770 st.cancel_requested = true;
1771 st.updated_at_ms = now_ms;
1772 true
1773 }
1774
1775 pub async fn is_station_health_check_cancel_requested(
1776 &self,
1777 service_name: &str,
1778 station_name: &str,
1779 ) -> bool {
1780 let guard = self.station_health_checks.read().await;
1781 guard
1782 .get(service_name)
1783 .and_then(|m| m.get(station_name))
1784 .is_some_and(|s| s.cancel_requested && !s.done)
1785 }
1786
1787 pub async fn record_station_health_check_result(
1788 &self,
1789 service_name: &str,
1790 station_name: &str,
1791 now_ms: u64,
1792 upstream: UpstreamHealth,
1793 ) {
1794 {
1795 let mut guard = self.station_health.write().await;
1796 let per_service = guard.entry(service_name.to_string()).or_default();
1797 let entry = per_service
1798 .entry(station_name.to_string())
1799 .or_insert_with(|| StationHealth {
1800 checked_at_ms: now_ms,
1801 upstreams: Vec::new(),
1802 });
1803 entry.checked_at_ms = entry.checked_at_ms.max(now_ms);
1804 entry.upstreams.push(upstream.clone());
1805 if entry.upstreams.len() > Self::MAX_HEALTH_RECORDS_PER_STATION {
1806 let extra = entry
1807 .upstreams
1808 .len()
1809 .saturating_sub(Self::MAX_HEALTH_RECORDS_PER_STATION);
1810 if extra > 0 {
1811 entry.upstreams.drain(0..extra);
1812 }
1813 }
1814 }
1815
1816 let mut guard = self.station_health_checks.write().await;
1817 let per_service = guard.entry(service_name.to_string()).or_default();
1818 let st = per_service.entry(station_name.to_string()).or_default();
1819 st.updated_at_ms = now_ms;
1820 st.completed = st.completed.saturating_add(1);
1821 match upstream.ok {
1822 Some(true) => st.ok = st.ok.saturating_add(1),
1823 Some(false) => {
1824 st.err = st.err.saturating_add(1);
1825 if st.last_error.is_none() {
1826 st.last_error = upstream.error.clone();
1827 }
1828 }
1829 None => {}
1830 }
1831 }
1832
1833 pub async fn finish_station_health_check(
1834 &self,
1835 service_name: &str,
1836 station_name: &str,
1837 now_ms: u64,
1838 canceled: bool,
1839 ) {
1840 let mut guard = self.station_health_checks.write().await;
1841 let per_service = guard.entry(service_name.to_string()).or_default();
1842 let st = per_service.entry(station_name.to_string()).or_default();
1843 st.updated_at_ms = now_ms;
1844 st.canceled = canceled;
1845 st.done = true;
1846 }
1847
1848 pub async fn get_usage_rollup_view(
1849 &self,
1850 service_name: &str,
1851 top_n: usize,
1852 days: usize,
1853 ) -> UsageRollupView {
1854 let guard = self.usage_rollups.read().await;
1855 let Some(rollup) = guard.get(service_name) else {
1856 return UsageRollupView::default();
1857 };
1858
1859 fn now_day() -> i32 {
1860 std::time::SystemTime::now()
1861 .duration_since(std::time::UNIX_EPOCH)
1862 .map(|d| (d.as_millis() / 86_400_000) as i32)
1863 .unwrap_or(0)
1864 }
1865
1866 fn sorted_day_series(map: &HashMap<i32, UsageBucket>) -> Vec<(i32, UsageBucket)> {
1867 let mut out = map.iter().map(|(k, v)| (*k, v.clone())).collect::<Vec<_>>();
1868 out.sort_by_key(|(k, _)| *k);
1869 out
1870 }
1871
1872 fn filled_day_series(
1873 map: &HashMap<i32, UsageBucket>,
1874 start_day: i32,
1875 end_day: i32,
1876 ) -> Vec<(i32, UsageBucket)> {
1877 if start_day > end_day {
1878 return Vec::new();
1879 }
1880 (start_day..=end_day)
1881 .map(|day| (day, map.get(&day).cloned().unwrap_or_default()))
1882 .collect()
1883 }
1884
1885 fn sum_series(series: &[(i32, UsageBucket)]) -> UsageBucket {
1886 let mut out = UsageBucket::default();
1887 for (_, bucket) in series {
1888 out.add_assign(bucket);
1889 }
1890 out
1891 }
1892
1893 fn aggregate_entity_window(
1894 source: &HashMap<String, HashMap<i32, UsageBucket>>,
1895 start_day: Option<i32>,
1896 end_day: Option<i32>,
1897 top_n: usize,
1898 ) -> Vec<(String, UsageBucket)> {
1899 let mut out = Vec::new();
1900 for (name, days) in source {
1901 let mut bucket = UsageBucket::default();
1902 for (day, value) in days {
1903 let include = match (start_day, end_day) {
1904 (Some(start), Some(end)) => *day >= start && *day <= end,
1905 _ => true,
1906 };
1907 if include {
1908 bucket.add_assign(value);
1909 }
1910 }
1911 if bucket.requests_total > 0 {
1912 out.push((name.clone(), bucket));
1913 }
1914 }
1915 out.sort_by(|(left_name, left), (right_name, right)| {
1916 right
1917 .usage
1918 .total_tokens
1919 .cmp(&left.usage.total_tokens)
1920 .then_with(|| right.requests_total.cmp(&left.requests_total))
1921 .then_with(|| left_name.cmp(right_name))
1922 });
1923 out.truncate(top_n);
1924 out
1925 }
1926
1927 let all_loaded = days == 0;
1928 let loaded_first_day = rollup.by_day.keys().min().copied();
1929 let loaded_last_day = rollup.by_day.keys().max().copied();
1930 let loaded_days_with_data = rollup
1931 .by_day
1932 .values()
1933 .filter(|bucket| bucket.requests_total > 0)
1934 .count();
1935
1936 let (start_day, end_day) = if all_loaded {
1937 (loaded_first_day, loaded_last_day)
1938 } else {
1939 let end = now_day();
1940 let offset = i32::try_from(days.saturating_sub(1)).unwrap_or(i32::MAX);
1941 (Some(end.saturating_sub(offset)), Some(end))
1942 };
1943
1944 let by_day = match (all_loaded, start_day, end_day) {
1945 (true, _, _) => sorted_day_series(&rollup.by_day),
1946 (false, Some(start), Some(end)) => filled_day_series(&rollup.by_day, start, end),
1947 _ => Vec::new(),
1948 };
1949 let window = if all_loaded {
1950 rollup.loaded.clone()
1951 } else {
1952 sum_series(&by_day)
1953 };
1954
1955 let mut by_config =
1956 aggregate_entity_window(&rollup.by_config_day, start_day, end_day, top_n);
1957 if all_loaded && by_config.is_empty() {
1958 by_config = rollup
1959 .by_config
1960 .iter()
1961 .map(|(k, v)| (k.clone(), v.clone()))
1962 .collect::<Vec<_>>();
1963 by_config.sort_by(|(left_name, left), (right_name, right)| {
1964 right
1965 .usage
1966 .total_tokens
1967 .cmp(&left.usage.total_tokens)
1968 .then_with(|| right.requests_total.cmp(&left.requests_total))
1969 .then_with(|| left_name.cmp(right_name))
1970 });
1971 by_config.truncate(top_n);
1972 }
1973
1974 let mut by_provider =
1975 aggregate_entity_window(&rollup.by_provider_day, start_day, end_day, top_n);
1976 if all_loaded && by_provider.is_empty() {
1977 by_provider = rollup
1978 .by_provider
1979 .iter()
1980 .map(|(k, v)| (k.clone(), v.clone()))
1981 .collect::<Vec<_>>();
1982 by_provider.sort_by(|(left_name, left), (right_name, right)| {
1983 right
1984 .usage
1985 .total_tokens
1986 .cmp(&left.usage.total_tokens)
1987 .then_with(|| right.requests_total.cmp(&left.requests_total))
1988 .then_with(|| left_name.cmp(right_name))
1989 });
1990 by_provider.truncate(top_n);
1991 }
1992
1993 let mut by_config_day = HashMap::new();
1994 for (name, _) in &by_config {
1995 let series = rollup
1996 .by_config_day
1997 .get(name)
1998 .map(|m| match (all_loaded, start_day, end_day) {
1999 (true, _, _) => sorted_day_series(m),
2000 (false, Some(start), Some(end)) => filled_day_series(m, start, end),
2001 _ => Vec::new(),
2002 })
2003 .unwrap_or_default();
2004 by_config_day.insert(name.clone(), series);
2005 }
2006
2007 let mut by_provider_day = HashMap::new();
2008 for (name, _) in &by_provider {
2009 let series = rollup
2010 .by_provider_day
2011 .get(name)
2012 .map(|m| match (all_loaded, start_day, end_day) {
2013 (true, _, _) => sorted_day_series(m),
2014 (false, Some(start), Some(end)) => filled_day_series(m, start, end),
2015 _ => Vec::new(),
2016 })
2017 .unwrap_or_default();
2018 by_provider_day.insert(name.clone(), series);
2019 }
2020
2021 let window_days_with_data = by_day
2022 .iter()
2023 .filter(|(_, bucket)| bucket.requests_total > 0)
2024 .count();
2025 let coverage = UsageRollupCoverage {
2026 requested_days: days,
2027 all_loaded,
2028 loaded_first_day,
2029 loaded_last_day,
2030 loaded_days_with_data,
2031 loaded_requests: rollup.loaded.requests_total,
2032 window_first_day: start_day,
2033 window_last_day: end_day,
2034 window_days_with_data,
2035 window_requests: window.requests_total,
2036 window_exceeds_loaded_start: matches!(
2037 (all_loaded, start_day, loaded_first_day),
2038 (false, Some(start), Some(first)) if start < first
2039 ),
2040 };
2041
2042 UsageRollupView {
2043 loaded: rollup.loaded.clone(),
2044 window,
2045 coverage,
2046 by_day,
2047 by_config,
2048 by_config_day,
2049 by_provider,
2050 by_provider_day,
2051 }
2052 }
2053
2054 pub async fn replay_usage_from_requests_log(
2055 &self,
2056 service_name: &str,
2057 log_path: PathBuf,
2058 base_url_to_provider_id: HashMap<String, String>,
2059 ) -> usize {
2060 let enabled = std::env::var("CODEX_HELPER_USAGE_REPLAY_ON_STARTUP")
2061 .ok()
2062 .map(|v| {
2063 matches!(
2064 v.trim().to_ascii_lowercase().as_str(),
2065 "1" | "true" | "yes" | "y" | "on"
2066 )
2067 })
2068 .unwrap_or(true);
2069 if !enabled {
2070 return 0;
2071 }
2072
2073 let already_has_data = {
2074 let guard = self.usage_rollups.read().await;
2075 guard
2076 .get(service_name)
2077 .is_some_and(|r| r.loaded.requests_total > 0)
2078 };
2079 if already_has_data {
2080 return 0;
2081 }
2082
2083 if !log_path.exists() {
2084 return 0;
2085 }
2086
2087 let max_bytes = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_BYTES")
2088 .ok()
2089 .and_then(|s| s.trim().parse::<usize>().ok())
2090 .filter(|&n| n > 0)
2091 .unwrap_or(8 * 1024 * 1024);
2092 let max_lines = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_LINES")
2093 .ok()
2094 .and_then(|s| s.trim().parse::<usize>().ok())
2095 .filter(|&n| n > 0)
2096 .unwrap_or(20_000);
2097
2098 let mut file = match std::fs::File::open(&log_path) {
2099 Ok(f) => f,
2100 Err(_) => return 0,
2101 };
2102 let len: u64 = file.metadata().map(|m| m.len()).unwrap_or_default();
2103 let start = len.saturating_sub(max_bytes as u64);
2104 if file.seek(SeekFrom::Start(start)).is_err() {
2105 return 0;
2106 }
2107 let mut buf = Vec::new();
2108 if file.read_to_end(&mut buf).is_err() {
2109 return 0;
2110 }
2111 if start > 0 {
2112 if let Some(pos) = buf.iter().position(|b| *b == b'\n') {
2113 buf = buf[pos + 1..].to_vec();
2114 } else {
2115 return 0;
2116 }
2117 }
2118
2119 let text = match std::str::from_utf8(&buf) {
2120 Ok(s) => s,
2121 Err(_) => return 0,
2122 };
2123 let lines = text
2124 .lines()
2125 .map(|l| l.trim())
2126 .filter(|l| !l.is_empty())
2127 .collect::<Vec<_>>();
2128 let start_idx = lines.len().saturating_sub(max_lines);
2129
2130 let mut events = Vec::new();
2131 for line in &lines[start_idx..] {
2132 let Ok(v) = serde_json::from_str::<JsonValue>(line) else {
2133 continue;
2134 };
2135 let Some(svc) = v.get("service").and_then(|x| x.as_str()) else {
2136 continue;
2137 };
2138 if svc != service_name {
2139 continue;
2140 }
2141
2142 let ended_at_ms = v.get("timestamp_ms").and_then(|x| x.as_u64()).unwrap_or(0);
2143 let status_code = v.get("status_code").and_then(|x| x.as_u64()).unwrap_or(0) as u16;
2144 let duration_ms = v.get("duration_ms").and_then(|x| x.as_u64()).unwrap_or(0);
2145 let station_name = v
2146 .get("station_name")
2147 .and_then(|x| x.as_str())
2148 .unwrap_or("-")
2149 .to_string();
2150 let upstream_base_url = v
2151 .get("upstream_base_url")
2152 .and_then(|x| x.as_str())
2153 .unwrap_or("-")
2154 .to_string();
2155 let provider_id = v
2156 .get("provider_id")
2157 .and_then(|x| x.as_str())
2158 .map(|s| s.to_string())
2159 .or_else(|| base_url_to_provider_id.get(&upstream_base_url).cloned())
2160 .unwrap_or_else(|| "-".to_string());
2161 let usage = v
2162 .get("usage")
2163 .and_then(|u| serde_json::from_value::<UsageMetrics>(u.clone()).ok());
2164 let ttfb_ms = v.get("ttfb_ms").and_then(|x| x.as_u64());
2165
2166 events.push((
2167 ended_at_ms,
2168 status_code,
2169 duration_ms,
2170 station_name,
2171 provider_id,
2172 usage,
2173 ttfb_ms,
2174 ));
2175 }
2176
2177 if events.is_empty() {
2178 return 0;
2179 }
2180
2181 let mut guard = self.usage_rollups.write().await;
2182 let rollup = guard.entry(service_name.to_string()).or_default();
2183 for (ended_at_ms, status_code, duration_ms, cfg_key, provider_key, usage, ttfb_ms) in
2184 events.iter()
2185 {
2186 let day = (*ended_at_ms / 86_400_000) as i32;
2187 rollup
2188 .loaded
2189 .record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
2190 rollup.by_day.entry(day).or_default().record(
2191 *status_code,
2192 *duration_ms,
2193 usage.as_ref(),
2194 None,
2195 *ttfb_ms,
2196 );
2197 rollup.by_config.entry(cfg_key.clone()).or_default().record(
2198 *status_code,
2199 *duration_ms,
2200 usage.as_ref(),
2201 None,
2202 *ttfb_ms,
2203 );
2204 rollup
2205 .by_config_day
2206 .entry(cfg_key.clone())
2207 .or_default()
2208 .entry(day)
2209 .or_default()
2210 .record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
2211 rollup
2212 .by_provider
2213 .entry(provider_key.clone())
2214 .or_default()
2215 .record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
2216 rollup
2217 .by_provider_day
2218 .entry(provider_key.clone())
2219 .or_default()
2220 .entry(day)
2221 .or_default()
2222 .record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
2223 }
2224
2225 events.len()
2226 }
2227
2228 pub async fn resolve_session_cwd(&self, session_id: &str) -> Option<String> {
2229 if self.session_cwd_cache_max_entries == 0 {
2230 return sessions::find_codex_session_cwd_by_id(session_id)
2231 .await
2232 .ok()
2233 .flatten();
2234 }
2235
2236 let now_ms = std::time::SystemTime::now()
2237 .duration_since(std::time::UNIX_EPOCH)
2238 .map(|d| d.as_millis() as u64)
2239 .unwrap_or(0);
2240
2241 {
2242 let guard = self.session_cwd_cache.read().await;
2243 if let Some(v) = guard.get(session_id) {
2244 let out = v.cwd.clone();
2245 drop(guard);
2246 let mut guard = self.session_cwd_cache.write().await;
2247 if let Some(v) = guard.get_mut(session_id) {
2248 v.last_seen_ms = now_ms;
2249 }
2250 return out;
2251 }
2252 }
2253
2254 let resolved = sessions::find_codex_session_cwd_by_id(session_id)
2257 .await
2258 .ok()
2259 .flatten();
2260
2261 let mut guard = self.session_cwd_cache.write().await;
2262 guard.insert(
2263 session_id.to_string(),
2264 SessionCwdCacheEntry {
2265 cwd: resolved.clone(),
2266 last_seen_ms: now_ms,
2267 },
2268 );
2269 resolved
2270 }
2271
2272 #[allow(clippy::too_many_arguments)]
2273 pub async fn begin_request(
2274 &self,
2275 service: &str,
2276 method: &str,
2277 path: &str,
2278 session_id: Option<String>,
2279 client_name: Option<String>,
2280 client_addr: Option<String>,
2281 cwd: Option<String>,
2282 model: Option<String>,
2283 reasoning_effort: Option<String>,
2284 service_tier: Option<String>,
2285 started_at_ms: u64,
2286 ) -> u64 {
2287 let id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
2288 let trace_id = Some(crate::logging::request_trace_id(service, id));
2289 let req = ActiveRequest {
2290 id,
2291 trace_id,
2292 session_id,
2293 client_name,
2294 client_addr,
2295 cwd,
2296 model,
2297 reasoning_effort,
2298 service_tier,
2299 station_name: None,
2300 provider_id: None,
2301 upstream_base_url: None,
2302 route_decision: None,
2303 service: service.to_string(),
2304 method: method.to_string(),
2305 path: path.to_string(),
2306 started_at_ms,
2307 };
2308 let mut guard = self.active_requests.write().await;
2309 guard.insert(id, req);
2310 id
2311 }
2312
2313 pub async fn update_request_route(
2314 &self,
2315 request_id: u64,
2316 station_name: Option<String>,
2317 provider_id: Option<String>,
2318 upstream_base_url: String,
2319 route_decision: Option<RouteDecisionProvenance>,
2320 ) {
2321 let mut guard = self.active_requests.write().await;
2322 let Some(req) = guard.get_mut(&request_id) else {
2323 return;
2324 };
2325 req.station_name = station_name;
2326 req.provider_id = provider_id;
2327 req.upstream_base_url = Some(upstream_base_url);
2328 req.route_decision = route_decision;
2329 }
2330
2331 pub async fn finish_request(&self, params: FinishRequestParams) {
2332 let mut active = self.active_requests.write().await;
2333 let Some(req) = active.remove(¶ms.id) else {
2334 return;
2335 };
2336
2337 let pricing_model = req
2338 .route_decision
2339 .as_ref()
2340 .and_then(|decision| decision.effective_model.as_ref())
2341 .map(|value| value.value.as_str())
2342 .or(req.model.as_deref());
2343 let cost = estimate_request_cost_from_operator_catalog_for_service(
2344 pricing_model,
2345 params.usage.as_ref(),
2346 CostAdjustments::default(),
2347 &req.service,
2348 );
2349
2350 let mut finished = FinishedRequest {
2351 id: params.id,
2352 trace_id: req.trace_id,
2353 session_id: req.session_id,
2354 client_name: req.client_name,
2355 client_addr: req.client_addr,
2356 cwd: req.cwd,
2357 model: req.model,
2358 reasoning_effort: req.reasoning_effort,
2359 service_tier: params.observed_service_tier.or(req.service_tier),
2360 station_name: req.station_name,
2361 provider_id: req.provider_id,
2362 upstream_base_url: req.upstream_base_url,
2363 route_decision: req.route_decision,
2364 usage: params.usage.clone(),
2365 cost,
2366 retry: params.retry,
2367 observability: RequestObservability::default(),
2368 service: req.service,
2369 method: req.method,
2370 path: req.path,
2371 status_code: params.status_code,
2372 duration_ms: params.duration_ms,
2373 ttfb_ms: params.ttfb_ms,
2374 streaming: params.streaming,
2375 ended_at_ms: params.ended_at_ms,
2376 };
2377 finished.refresh_observability();
2378
2379 {
2380 let day = (finished.ended_at_ms / 86_400_000) as i32;
2381 let cfg_key = finished
2382 .station_name
2383 .clone()
2384 .unwrap_or_else(|| "-".to_string());
2385 let provider_key = finished
2386 .provider_id
2387 .clone()
2388 .unwrap_or_else(|| "-".to_string());
2389
2390 let mut rollups = self.usage_rollups.write().await;
2391 let rollup = rollups.entry(finished.service.clone()).or_default();
2392 rollup.loaded.record(
2393 finished.status_code,
2394 finished.duration_ms,
2395 finished.usage.as_ref(),
2396 Some(&finished.cost),
2397 finished.ttfb_ms,
2398 );
2399 rollup.by_day.entry(day).or_default().record(
2400 finished.status_code,
2401 finished.duration_ms,
2402 finished.usage.as_ref(),
2403 Some(&finished.cost),
2404 finished.ttfb_ms,
2405 );
2406 rollup.by_config.entry(cfg_key.clone()).or_default().record(
2407 finished.status_code,
2408 finished.duration_ms,
2409 finished.usage.as_ref(),
2410 Some(&finished.cost),
2411 finished.ttfb_ms,
2412 );
2413 rollup
2414 .by_config_day
2415 .entry(cfg_key)
2416 .or_default()
2417 .entry(day)
2418 .or_default()
2419 .record(
2420 finished.status_code,
2421 finished.duration_ms,
2422 finished.usage.as_ref(),
2423 Some(&finished.cost),
2424 finished.ttfb_ms,
2425 );
2426
2427 rollup
2428 .by_provider
2429 .entry(provider_key.clone())
2430 .or_default()
2431 .record(
2432 finished.status_code,
2433 finished.duration_ms,
2434 finished.usage.as_ref(),
2435 Some(&finished.cost),
2436 finished.ttfb_ms,
2437 );
2438 rollup
2439 .by_provider_day
2440 .entry(provider_key)
2441 .or_default()
2442 .entry(day)
2443 .or_default()
2444 .record(
2445 finished.status_code,
2446 finished.duration_ms,
2447 finished.usage.as_ref(),
2448 Some(&finished.cost),
2449 finished.ttfb_ms,
2450 );
2451 }
2452
2453 if let Some(sid) = finished.session_id.as_deref() {
2454 let mut stats = self.session_stats.write().await;
2455 let entry = stats.entry(sid.to_string()).or_default();
2456 entry.turns_total = entry.turns_total.saturating_add(1);
2457 entry.last_client_name = finished
2458 .client_name
2459 .clone()
2460 .or(entry.last_client_name.clone());
2461 entry.last_client_addr = finished
2462 .client_addr
2463 .clone()
2464 .or(entry.last_client_addr.clone());
2465 entry.last_model = finished.model.clone().or(entry.last_model.clone());
2466 entry.last_reasoning_effort = finished
2467 .reasoning_effort
2468 .clone()
2469 .or(entry.last_reasoning_effort.clone());
2470 entry.last_service_tier = finished
2471 .service_tier
2472 .clone()
2473 .or(entry.last_service_tier.clone());
2474 entry.last_provider_id = finished
2475 .provider_id
2476 .clone()
2477 .or(entry.last_provider_id.clone());
2478 entry.last_station_name = finished
2479 .station_name
2480 .clone()
2481 .or(entry.last_station_name.clone());
2482 if finished.route_decision.is_some() {
2483 entry.last_route_decision = finished.route_decision.clone();
2484 }
2485 if let Some(u) = finished.usage.as_ref() {
2486 entry.last_usage = Some(u.clone());
2487 entry.total_usage.add_assign(u);
2488 entry.turns_with_usage = entry.turns_with_usage.saturating_add(1);
2489 }
2490 entry.last_status = Some(finished.status_code);
2491 entry.last_duration_ms = Some(finished.duration_ms);
2492 entry.last_ended_at_ms = Some(finished.ended_at_ms);
2493 entry.last_seen_ms = finished.ended_at_ms;
2494 }
2495
2496 let mut recent = self.recent_finished.write().await;
2497 recent.push_front(finished);
2498 while recent.len() > recent_finished_max() {
2499 recent.pop_back();
2500 }
2501 }
2502
2503 pub async fn list_active_requests(&self) -> Vec<ActiveRequest> {
2504 let guard = self.active_requests.read().await;
2505 let mut vec = guard.values().cloned().collect::<Vec<_>>();
2506 vec.sort_by_key(|r| r.started_at_ms);
2507 vec
2508 }
2509
2510 pub async fn list_recent_finished(&self, limit: usize) -> Vec<FinishedRequest> {
2511 let guard = self.recent_finished.read().await;
2512 guard.iter().take(limit).cloned().collect()
2513 }
2514
2515 pub async fn list_session_stats(&self) -> HashMap<String, SessionStats> {
2516 let guard = self.session_stats.read().await;
2517 guard.clone()
2518 }
2519
2520 pub async fn list_session_identity_cards(
2521 &self,
2522 recent_limit: usize,
2523 ) -> Vec<SessionIdentityCard> {
2524 let recent_limit = recent_limit.clamp(1, recent_finished_max());
2525 let (
2526 active,
2527 recent,
2528 overrides,
2529 station_overrides,
2530 model_overrides,
2531 service_tier_overrides,
2532 bindings,
2533 route_affinities,
2534 global_station_override,
2535 stats,
2536 ) = tokio::join!(
2537 self.list_active_requests(),
2538 self.list_recent_finished(recent_limit),
2539 self.list_session_effort_overrides(),
2540 self.list_session_station_overrides(),
2541 self.list_session_model_overrides(),
2542 self.list_session_service_tier_overrides(),
2543 self.list_session_bindings(),
2544 self.list_session_route_affinities(),
2545 self.get_global_station_override(),
2546 self.list_session_stats(),
2547 );
2548 build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
2549 active: &active,
2550 recent: &recent,
2551 overrides: &overrides,
2552 station_overrides: &station_overrides,
2553 model_overrides: &model_overrides,
2554 service_tier_overrides: &service_tier_overrides,
2555 bindings: &bindings,
2556 route_affinities: &route_affinities,
2557 global_station_override: global_station_override.as_deref(),
2558 stats: &stats,
2559 })
2560 }
2561
2562 async fn resolve_host_transcript_paths_cached(
2563 &self,
2564 session_ids: &[String],
2565 ) -> HashMap<String, Option<String>> {
2566 let mut unique = session_ids
2567 .iter()
2568 .map(|sid| sid.trim())
2569 .filter(|sid| !sid.is_empty())
2570 .map(ToOwned::to_owned)
2571 .collect::<Vec<_>>();
2572 unique.sort();
2573 unique.dedup();
2574 if unique.is_empty() {
2575 return HashMap::new();
2576 }
2577
2578 if self.session_transcript_path_cache_max_entries == 0 {
2579 return sessions::find_codex_session_files_by_ids(&unique)
2580 .await
2581 .unwrap_or_default()
2582 .into_iter()
2583 .map(|(sid, path)| (sid, Some(path.to_string_lossy().to_string())))
2584 .collect();
2585 }
2586
2587 let now_ms = unix_now_ms();
2588 let ttl_ms = self.session_transcript_path_cache_ttl_ms;
2589 let mut resolved = HashMap::<String, Option<String>>::new();
2590 let mut stale_or_missing = Vec::<String>::new();
2591
2592 {
2593 let cache = self.session_transcript_path_cache.read().await;
2594 for sid in &unique {
2595 let fresh = cache.get(sid).filter(|entry| {
2596 ttl_ms == 0 || now_ms.saturating_sub(entry.last_checked_ms) <= ttl_ms
2597 });
2598 if let Some(entry) = fresh {
2599 resolved.insert(sid.clone(), entry.path.clone());
2600 } else {
2601 stale_or_missing.push(sid.clone());
2602 }
2603 }
2604 }
2605
2606 if !stale_or_missing.is_empty() {
2607 let found = sessions::find_codex_session_files_by_ids(&stale_or_missing)
2608 .await
2609 .unwrap_or_default();
2610 let mut cache = self.session_transcript_path_cache.write().await;
2611 for sid in stale_or_missing {
2612 let path = found
2613 .get(&sid)
2614 .map(|path| path.to_string_lossy().to_string());
2615 cache.insert(
2616 sid.clone(),
2617 SessionTranscriptPathCacheEntry {
2618 path: path.clone(),
2619 last_checked_ms: now_ms,
2620 last_seen_ms: now_ms,
2621 },
2622 );
2623 resolved.insert(sid, path);
2624 }
2625 prune_lru_cache(
2626 &mut cache,
2627 self.session_transcript_path_cache_max_entries,
2628 |entry| entry.last_seen_ms,
2629 );
2630 }
2631
2632 {
2633 let mut cache = self.session_transcript_path_cache.write().await;
2634 for sid in &unique {
2635 if let Some(entry) = cache.get_mut(sid) {
2636 entry.last_seen_ms = now_ms;
2637 }
2638 }
2639 }
2640
2641 resolved
2642 }
2643
2644 pub async fn enrich_session_identity_cards_with_cached_host_transcripts(
2645 &self,
2646 cards: &mut [SessionIdentityCard],
2647 ) {
2648 let session_ids = cards
2649 .iter()
2650 .filter_map(|card| card.session_id.as_deref())
2651 .map(str::to_owned)
2652 .collect::<Vec<_>>();
2653 let resolved = self
2654 .resolve_host_transcript_paths_cached(&session_ids)
2655 .await;
2656 for card in cards {
2657 card.host_local_transcript_path = card
2658 .session_id
2659 .as_deref()
2660 .and_then(|sid| resolved.get(sid))
2661 .and_then(Clone::clone);
2662 }
2663 }
2664
2665 pub async fn list_session_identity_cards_with_host_transcripts(
2666 &self,
2667 recent_limit: usize,
2668 ) -> Vec<SessionIdentityCard> {
2669 let mut cards = self.list_session_identity_cards(recent_limit).await;
2670 self.enrich_session_identity_cards_with_cached_host_transcripts(&mut cards)
2671 .await;
2672 cards
2673 }
2674
2675 pub fn spawn_cleanup_task(state: Arc<Self>) {
2676 tokio::spawn(async move {
2678 let mut tick = interval(Duration::from_secs(30));
2679 loop {
2680 tick.tick().await;
2681 state.prune_periodic().await;
2682 }
2683 });
2684 }
2685
2686 async fn prune_periodic(&self) {
2687 let now_ms = std::time::SystemTime::now()
2688 .duration_since(std::time::UNIX_EPOCH)
2689 .map(|d| d.as_millis() as u64)
2690 .unwrap_or(0);
2691
2692 let active = self.active_requests.read().await;
2694 let mut active_sessions: HashMap<String, ()> = HashMap::new();
2695 for req in active.values() {
2696 if let Some(sid) = req.session_id.as_deref() {
2697 active_sessions.insert(sid.to_string(), ());
2698 }
2699 }
2700
2701 if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2702 let cutoff_override = now_ms - self.session_override_ttl_ms;
2703 let mut overrides = self.session_effort_overrides.write().await;
2704 overrides.retain(|sid, v| {
2705 if active_sessions.contains_key(sid) {
2706 return true;
2707 }
2708 v.last_seen_ms >= cutoff_override
2709 });
2710 }
2711
2712 if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2713 let cutoff_override = now_ms - self.session_override_ttl_ms;
2714 let mut overrides = self.session_station_overrides.write().await;
2715 overrides.retain(|sid, v| {
2716 if active_sessions.contains_key(sid) {
2717 return true;
2718 }
2719 v.last_seen_ms >= cutoff_override
2720 });
2721 }
2722
2723 if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2724 let cutoff_override = now_ms - self.session_override_ttl_ms;
2725 let mut overrides = self.session_route_target_overrides.write().await;
2726 overrides.retain(|sid, v| {
2727 if active_sessions.contains_key(sid) {
2728 return true;
2729 }
2730 v.last_seen_ms >= cutoff_override
2731 });
2732 }
2733
2734 if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2735 let cutoff_override = now_ms - self.session_override_ttl_ms;
2736 let mut overrides = self.session_model_overrides.write().await;
2737 overrides.retain(|sid, v| {
2738 if active_sessions.contains_key(sid) {
2739 return true;
2740 }
2741 v.last_seen_ms >= cutoff_override
2742 });
2743 }
2744
2745 if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2746 let cutoff_override = now_ms - self.session_override_ttl_ms;
2747 let mut overrides = self.session_service_tier_overrides.write().await;
2748 overrides.retain(|sid, v| {
2749 if active_sessions.contains_key(sid) {
2750 return true;
2751 }
2752 v.last_seen_ms >= cutoff_override
2753 });
2754 }
2755
2756 if self.session_binding_ttl_ms > 0 && now_ms >= self.session_binding_ttl_ms {
2757 let cutoff_binding = now_ms - self.session_binding_ttl_ms;
2758 let mut bindings = self.session_bindings.write().await;
2759 bindings.retain(|sid, entry| {
2760 if active_sessions.contains_key(sid) {
2761 return true;
2762 }
2763 entry.binding.last_seen_ms >= cutoff_binding
2764 });
2765 }
2766 if self.session_binding_max_entries > 0 {
2767 let mut bindings = self.session_bindings.write().await;
2768 if bindings.len() > self.session_binding_max_entries {
2769 let mut removable = bindings
2770 .iter()
2771 .filter(|(sid, _)| !active_sessions.contains_key(*sid))
2772 .map(|(sid, entry)| (sid.clone(), entry.binding.last_seen_ms))
2773 .collect::<Vec<_>>();
2774 removable.sort_by_key(|(_, last_seen_ms)| *last_seen_ms);
2775 let remove_count = bindings
2776 .len()
2777 .saturating_sub(self.session_binding_max_entries)
2778 .min(removable.len());
2779 for (sid, _) in removable.into_iter().take(remove_count) {
2780 bindings.remove(&sid);
2781 }
2782 }
2783 }
2784
2785 {
2786 let mut affinities = self.session_route_affinities.write().await;
2787 if self.session_route_affinity_ttl_ms > 0
2788 && now_ms >= self.session_route_affinity_ttl_ms
2789 {
2790 let cutoff_affinity = now_ms - self.session_route_affinity_ttl_ms;
2791 affinities.retain(|sid, affinity| {
2792 active_sessions.contains_key(sid)
2793 || affinity.last_selected_at_ms >= cutoff_affinity
2794 });
2795 }
2796 prune_lru_cache(
2797 &mut affinities,
2798 self.session_route_affinity_max_entries,
2799 |entry| entry.last_selected_at_ms,
2800 );
2801 }
2802
2803 let keep_days: i32 = std::env::var("CODEX_HELPER_USAGE_ROLLUP_KEEP_DAYS")
2805 .ok()
2806 .and_then(|s| s.trim().parse::<i32>().ok())
2807 .filter(|&n| n > 0)
2808 .unwrap_or(60);
2809 let now_day = (now_ms / 86_400_000) as i32;
2810 let cutoff_day = now_day.saturating_sub(keep_days);
2811 let mut rollups = self.usage_rollups.write().await;
2812 for rollup in rollups.values_mut() {
2813 rollup.by_day.retain(|day, _| *day >= cutoff_day);
2814 rollup.by_config_day.retain(|_, m| {
2815 m.retain(|day, _| *day >= cutoff_day);
2816 !m.is_empty()
2817 });
2818 rollup.by_provider_day.retain(|_, m| {
2819 m.retain(|day, _| *day >= cutoff_day);
2820 !m.is_empty()
2821 });
2822 }
2823
2824 let cutoff_cwd =
2825 if self.session_cwd_cache_ttl_ms == 0 || now_ms < self.session_cwd_cache_ttl_ms {
2826 0
2827 } else {
2828 now_ms - self.session_cwd_cache_ttl_ms
2829 };
2830 self.prune_session_cwd_cache(&active_sessions, cutoff_cwd)
2831 .await;
2832 self.prune_session_transcript_path_cache(now_ms).await;
2833
2834 if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
2835 let cutoff_stats = now_ms - self.session_override_ttl_ms;
2836 let mut stats = self.session_stats.write().await;
2837 stats.retain(|sid, v| {
2838 active_sessions.contains_key(sid) || v.last_seen_ms >= cutoff_stats
2839 });
2840 }
2841 }
2842
2843 async fn prune_session_cwd_cache(&self, active_sessions: &HashMap<String, ()>, cutoff: u64) {
2844 if self.session_cwd_cache_max_entries == 0 {
2845 return;
2846 }
2847 let mut cache = self.session_cwd_cache.write().await;
2848
2849 if self.session_cwd_cache_ttl_ms > 0 {
2850 cache.retain(|sid, v| {
2851 if active_sessions.contains_key(sid) {
2852 return true;
2853 }
2854 v.last_seen_ms >= cutoff
2855 });
2856 }
2857
2858 let max = self.session_cwd_cache_max_entries;
2859 if max == 0 || cache.len() <= max {
2860 return;
2861 }
2862
2863 let mut keys = cache
2865 .iter()
2866 .map(|(sid, v)| (sid.clone(), v.last_seen_ms))
2867 .collect::<Vec<_>>();
2868 keys.sort_by_key(|(_, t)| *t);
2869 let remove_count = keys.len().saturating_sub(max);
2870 for (sid, _) in keys.into_iter().take(remove_count) {
2871 cache.remove(&sid);
2872 }
2873 }
2874
2875 async fn prune_session_transcript_path_cache(&self, now_ms: u64) {
2876 let mut cache = self.session_transcript_path_cache.write().await;
2877 if self.session_transcript_path_cache_max_entries == 0 {
2878 cache.clear();
2879 return;
2880 }
2881
2882 if self.session_transcript_path_cache_ttl_ms > 0 {
2883 let cutoff = now_ms.saturating_sub(self.session_transcript_path_cache_ttl_ms);
2884 cache.retain(|_, entry| entry.last_seen_ms >= cutoff);
2885 }
2886
2887 prune_lru_cache(
2888 &mut cache,
2889 self.session_transcript_path_cache_max_entries,
2890 |entry| entry.last_seen_ms,
2891 );
2892 }
2893}
2894
2895#[cfg(test)]
2896mod tests {
2897 use super::*;
2898
2899 use crate::config::{ServiceConfig, ServiceConfigManager, UpstreamAuth, UpstreamConfig};
2900 use crate::runtime_identity::ProviderEndpointKey;
2901
2902 fn test_runtime_policy(
2903 session_override_ttl_ms: u64,
2904 session_binding_ttl_ms: u64,
2905 session_binding_max_entries: usize,
2906 ) -> RuntimePolicy {
2907 RuntimePolicy {
2908 session_override_ttl_ms,
2909 session_binding_ttl_ms,
2910 session_binding_max_entries,
2911 session_route_affinity_ttl_ms: 0,
2912 session_route_affinity_max_entries: 5_000,
2913 session_cwd_cache_ttl_ms: 0,
2914 session_cwd_cache_max_entries: 0,
2915 session_transcript_path_cache_ttl_ms: 30_000,
2916 session_transcript_path_cache_max_entries: 5_000,
2917 }
2918 }
2919
2920 #[test]
2921 fn begin_and_finish_requests_keep_trace_id() {
2922 let runtime = tokio::runtime::Runtime::new().expect("runtime");
2923 runtime.block_on(async {
2924 let state = ProxyState::new();
2925 let request_id = state
2926 .begin_request(
2927 "codex",
2928 "POST",
2929 "/v1/responses",
2930 None,
2931 None,
2932 None,
2933 None,
2934 Some("gpt-5".to_string()),
2935 None,
2936 Some("priority".to_string()),
2937 100,
2938 )
2939 .await;
2940
2941 let active = state.list_active_requests().await;
2942 assert_eq!(active[0].trace_id.as_deref(), Some("codex-1"));
2943
2944 state
2945 .finish_request(FinishRequestParams {
2946 id: request_id,
2947 status_code: 200,
2948 duration_ms: 10,
2949 ended_at_ms: 110,
2950 observed_service_tier: Some("priority".to_string()),
2951 usage: None,
2952 retry: None,
2953 ttfb_ms: Some(4),
2954 streaming: false,
2955 })
2956 .await;
2957
2958 let recent = state.list_recent_finished(1).await;
2959 assert_eq!(recent[0].trace_id.as_deref(), Some("codex-1"));
2960 assert_eq!(recent[0].observability.trace_id.as_deref(), Some("codex-1"));
2961 assert!(recent[0].observability.fast_mode);
2962 assert_eq!(recent[0].observability.generation_ms, Some(6));
2963 });
2964 }
2965
2966 #[test]
2967 fn finish_request_estimates_cost_and_rolls_up_cost() {
2968 let runtime = tokio::runtime::Runtime::new().expect("runtime");
2969 runtime.block_on(async {
2970 let state = ProxyState::new();
2971 let request_id = state
2972 .begin_request(
2973 "codex",
2974 "POST",
2975 "/v1/responses",
2976 None,
2977 None,
2978 None,
2979 None,
2980 Some("gpt-5".to_string()),
2981 None,
2982 None,
2983 100,
2984 )
2985 .await;
2986
2987 state
2988 .finish_request(FinishRequestParams {
2989 id: request_id,
2990 status_code: 200,
2991 duration_ms: 10,
2992 ended_at_ms: 110,
2993 observed_service_tier: None,
2994 usage: Some(UsageMetrics {
2995 input_tokens: 1_000,
2996 output_tokens: 500,
2997 cached_input_tokens: 100,
2998 total_tokens: 1_500,
2999 ..UsageMetrics::default()
3000 }),
3001 retry: None,
3002 ttfb_ms: Some(4),
3003 streaming: false,
3004 })
3005 .await;
3006
3007 let recent = state.list_recent_finished(1).await;
3008 assert_eq!(recent[0].cost.total_cost_usd.as_deref(), Some("0.0061375"));
3009
3010 let rollup = state.get_usage_rollup_view("codex", 12, 1).await;
3011 assert_eq!(
3012 rollup.loaded.cost.total_cost_usd.as_deref(),
3013 Some("0.0061375")
3014 );
3015 assert_eq!(rollup.loaded.cost.priced_requests, 1);
3016 assert_eq!(rollup.loaded.cost.unpriced_requests, 0);
3017 });
3018 }
3019
3020 #[test]
3021 fn usage_rollup_view_scores_entities_inside_selected_window() {
3022 let runtime = tokio::runtime::Runtime::new().expect("runtime");
3023 runtime.block_on(async {
3024 let state = ProxyState::new();
3025 let now_ms = std::time::SystemTime::now()
3026 .duration_since(std::time::UNIX_EPOCH)
3027 .map(|d| d.as_millis() as u64)
3028 .unwrap_or(0);
3029 let old_ms = now_ms.saturating_sub(10 * 86_400_000);
3030
3031 let old_id = state
3032 .begin_request(
3033 "codex",
3034 "POST",
3035 "/v1/responses",
3036 None,
3037 None,
3038 None,
3039 None,
3040 Some("gpt-5".to_string()),
3041 None,
3042 None,
3043 old_ms.saturating_sub(1_000),
3044 )
3045 .await;
3046 state
3047 .update_request_route(
3048 old_id,
3049 Some("old-station".to_string()),
3050 Some("old-provider".to_string()),
3051 "https://old.example/v1".to_string(),
3052 None,
3053 )
3054 .await;
3055 state
3056 .finish_request(FinishRequestParams {
3057 id: old_id,
3058 status_code: 200,
3059 duration_ms: 20,
3060 ended_at_ms: old_ms,
3061 observed_service_tier: None,
3062 usage: Some(UsageMetrics {
3063 total_tokens: 100_000,
3064 ..UsageMetrics::default()
3065 }),
3066 retry: None,
3067 ttfb_ms: Some(5),
3068 streaming: false,
3069 })
3070 .await;
3071
3072 let fresh_id = state
3073 .begin_request(
3074 "codex",
3075 "POST",
3076 "/v1/responses",
3077 None,
3078 None,
3079 None,
3080 None,
3081 Some("gpt-5".to_string()),
3082 None,
3083 None,
3084 now_ms.saturating_sub(1_000),
3085 )
3086 .await;
3087 state
3088 .update_request_route(
3089 fresh_id,
3090 Some("fresh-station".to_string()),
3091 Some("fresh-provider".to_string()),
3092 "https://fresh.example/v1".to_string(),
3093 None,
3094 )
3095 .await;
3096 state
3097 .finish_request(FinishRequestParams {
3098 id: fresh_id,
3099 status_code: 200,
3100 duration_ms: 10,
3101 ended_at_ms: now_ms,
3102 observed_service_tier: None,
3103 usage: Some(UsageMetrics {
3104 total_tokens: 10,
3105 ..UsageMetrics::default()
3106 }),
3107 retry: None,
3108 ttfb_ms: Some(3),
3109 streaming: false,
3110 })
3111 .await;
3112
3113 let week = state.get_usage_rollup_view("codex", 10, 7).await;
3114 assert_eq!(week.loaded.requests_total, 2);
3115 assert_eq!(week.window.requests_total, 1);
3116 assert_eq!(week.by_day.len(), 7);
3117 assert_eq!(week.by_config[0].0, "fresh-station");
3118 assert_eq!(week.by_provider[0].0, "fresh-provider");
3119
3120 let loaded = state.get_usage_rollup_view("codex", 10, 0).await;
3121 assert_eq!(loaded.window.requests_total, 2);
3122 assert_eq!(loaded.by_config[0].0, "old-station");
3123 assert_eq!(loaded.by_provider[0].0, "old-provider");
3124 });
3125 }
3126
3127 #[test]
3128 fn build_session_identity_cards_merges_sources_and_sorts_newest_first() {
3129 let active = vec![ActiveRequest {
3130 id: 1,
3131 trace_id: Some("codex-1".to_string()),
3132 session_id: Some("sid-active".to_string()),
3133 client_name: Some("Frank-Laptop".to_string()),
3134 client_addr: Some("100.64.0.8".to_string()),
3135 cwd: Some("G:/codes/project".to_string()),
3136 model: Some("gpt-5.4".to_string()),
3137 reasoning_effort: Some("medium".to_string()),
3138 service_tier: Some("priority".to_string()),
3139 station_name: Some("right".to_string()),
3140 provider_id: Some("right".to_string()),
3141 upstream_base_url: Some("https://right.example/v1".to_string()),
3142 route_decision: None,
3143 service: "codex".to_string(),
3144 method: "POST".to_string(),
3145 path: "/v1/responses".to_string(),
3146 started_at_ms: 500,
3147 }];
3148 let recent = vec![
3149 FinishedRequest {
3150 id: 2,
3151 trace_id: Some("codex-2".to_string()),
3152 session_id: Some("sid-recent".to_string()),
3153 client_name: Some("Studio-Mini".to_string()),
3154 client_addr: Some("100.64.0.9".to_string()),
3155 cwd: Some("G:/codes/other".to_string()),
3156 model: Some("gpt-5.3".to_string()),
3157 reasoning_effort: Some("high".to_string()),
3158 service_tier: Some("default".to_string()),
3159 station_name: Some("vibe".to_string()),
3160 provider_id: Some("vibe".to_string()),
3161 upstream_base_url: Some("https://vibe.example/v1".to_string()),
3162 route_decision: None,
3163 usage: Some(UsageMetrics {
3164 input_tokens: 1,
3165 output_tokens: 2,
3166 reasoning_tokens: 3,
3167 total_tokens: 6,
3168 ..UsageMetrics::default()
3169 }),
3170 cost: CostBreakdown::default(),
3171 retry: None,
3172 observability: RequestObservability::default(),
3173 service: "codex".to_string(),
3174 method: "POST".to_string(),
3175 path: "/v1/responses".to_string(),
3176 status_code: 200,
3177 duration_ms: 1200,
3178 ttfb_ms: Some(100),
3179 streaming: false,
3180 ended_at_ms: 2_000,
3181 },
3182 FinishedRequest {
3183 id: 3,
3184 trace_id: Some("codex-3".to_string()),
3185 session_id: Some("sid-active".to_string()),
3186 client_name: Some("Frank-Laptop".to_string()),
3187 client_addr: Some("100.64.0.8".to_string()),
3188 cwd: Some("G:/codes/project".to_string()),
3189 model: Some("gpt-5.4".to_string()),
3190 reasoning_effort: Some("low".to_string()),
3191 service_tier: Some("flex".to_string()),
3192 station_name: Some("right".to_string()),
3193 provider_id: Some("right".to_string()),
3194 upstream_base_url: Some("https://right.example/v1".to_string()),
3195 route_decision: None,
3196 usage: None,
3197 cost: CostBreakdown::default(),
3198 retry: None,
3199 observability: RequestObservability::default(),
3200 service: "codex".to_string(),
3201 method: "POST".to_string(),
3202 path: "/v1/responses".to_string(),
3203 status_code: 429,
3204 duration_ms: 900,
3205 ttfb_ms: None,
3206 streaming: false,
3207 ended_at_ms: 1_000,
3208 },
3209 ];
3210 let overrides = HashMap::from([("sid-active".to_string(), "xhigh".to_string())]);
3211 let config_overrides = HashMap::from([("sid-active".to_string(), "temp".to_string())]);
3212 let model_overrides =
3213 HashMap::from([("sid-active".to_string(), "gpt-5.4-mini".to_string())]);
3214 let service_tier_overrides =
3215 HashMap::from([("sid-active".to_string(), "priority".to_string())]);
3216 let stats = HashMap::from([(
3217 "sid-active".to_string(),
3218 SessionStats {
3219 turns_total: 3,
3220 last_client_name: Some("Frank-Laptop".to_string()),
3221 last_client_addr: Some("100.64.0.8".to_string()),
3222 last_model: Some("gpt-5.4".to_string()),
3223 last_reasoning_effort: Some("low".to_string()),
3224 last_service_tier: Some("flex".to_string()),
3225 last_provider_id: Some("right".to_string()),
3226 last_station_name: Some("right".to_string()),
3227 last_route_decision: None,
3228 last_usage: None,
3229 total_usage: UsageMetrics {
3230 input_tokens: 10,
3231 output_tokens: 20,
3232 reasoning_tokens: 5,
3233 total_tokens: 35,
3234 ..UsageMetrics::default()
3235 },
3236 turns_with_usage: 2,
3237 last_status: Some(429),
3238 last_duration_ms: Some(900),
3239 last_ended_at_ms: Some(1_000),
3240 last_seen_ms: 1_000,
3241 },
3242 )]);
3243
3244 let cards = build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
3245 active: &active,
3246 recent: &recent,
3247 overrides: &overrides,
3248 station_overrides: &config_overrides,
3249 model_overrides: &model_overrides,
3250 service_tier_overrides: &service_tier_overrides,
3251 bindings: &HashMap::new(),
3252 route_affinities: &HashMap::new(),
3253 global_station_override: None,
3254 stats: &stats,
3255 });
3256
3257 assert_eq!(cards.len(), 2);
3258 assert_eq!(cards[0].session_id.as_deref(), Some("sid-recent"));
3259 assert_eq!(
3260 cards[0].observation_scope,
3261 SessionObservationScope::HostLocalEnriched
3262 );
3263 assert_eq!(cards[0].last_client_name.as_deref(), Some("Studio-Mini"));
3264 assert_eq!(cards[0].last_client_addr.as_deref(), Some("100.64.0.9"));
3265 assert_eq!(cards[1].session_id.as_deref(), Some("sid-active"));
3266 assert_eq!(
3267 cards[1].observation_scope,
3268 SessionObservationScope::HostLocalEnriched
3269 );
3270 assert_eq!(cards[1].active_count, 1);
3271 assert_eq!(cards[1].last_client_name.as_deref(), Some("Frank-Laptop"));
3272 assert_eq!(cards[1].last_client_addr.as_deref(), Some("100.64.0.8"));
3273 assert_eq!(cards[1].last_status, Some(429));
3274 assert_eq!(cards[1].override_effort.as_deref(), Some("xhigh"));
3275 assert_eq!(cards[1].override_station_name.as_deref(), Some("temp"));
3276 assert_eq!(cards[1].override_model.as_deref(), Some("gpt-5.4-mini"));
3277 assert_eq!(cards[1].override_service_tier.as_deref(), Some("priority"));
3278 assert_eq!(
3279 cards[1]
3280 .effective_model
3281 .as_ref()
3282 .map(|value| value.value.as_str()),
3283 Some("gpt-5.4-mini")
3284 );
3285 assert_eq!(
3286 cards[1].effective_model.as_ref().map(|value| value.source),
3287 Some(RouteValueSource::SessionOverride)
3288 );
3289 assert_eq!(
3290 cards[1]
3291 .effective_reasoning_effort
3292 .as_ref()
3293 .map(|value| value.source),
3294 Some(RouteValueSource::SessionOverride)
3295 );
3296 assert_eq!(
3297 cards[1]
3298 .effective_service_tier
3299 .as_ref()
3300 .map(|value| value.source),
3301 Some(RouteValueSource::SessionOverride)
3302 );
3303 assert_eq!(
3304 cards[1]
3305 .effective_station
3306 .as_ref()
3307 .map(|value| value.source),
3308 Some(RouteValueSource::SessionOverride)
3309 );
3310 assert!(cards[1].effective_upstream_base_url.is_none());
3311 assert_eq!(
3312 cards[1].last_upstream_base_url.as_deref(),
3313 Some("https://right.example/v1")
3314 );
3315 assert_eq!(cards[1].turns_total, Some(3));
3316 assert_eq!(cards[1].last_service_tier.as_deref(), Some("flex"));
3317 assert_eq!(
3318 cards[1].total_usage.as_ref().map(|u| u.total_tokens),
3319 Some(35)
3320 );
3321 }
3322
3323 #[test]
3324 fn build_session_identity_cards_prefers_binding_defaults_for_effective_route() {
3325 let active = vec![ActiveRequest {
3326 id: 1,
3327 trace_id: Some("codex-1".to_string()),
3328 session_id: Some("sid-bound".to_string()),
3329 client_name: Some("Workstation".to_string()),
3330 client_addr: Some("100.64.0.10".to_string()),
3331 cwd: None,
3332 model: Some("gpt-observed".to_string()),
3333 reasoning_effort: Some("medium".to_string()),
3334 service_tier: Some("default".to_string()),
3335 station_name: Some("right".to_string()),
3336 provider_id: None,
3337 upstream_base_url: None,
3338 route_decision: None,
3339 service: "codex".to_string(),
3340 method: "POST".to_string(),
3341 path: "/v1/responses".to_string(),
3342 started_at_ms: 10,
3343 }];
3344 let bindings = HashMap::from([(
3345 "sid-bound".to_string(),
3346 SessionBinding {
3347 session_id: "sid-bound".to_string(),
3348 profile_name: Some("daily".to_string()),
3349 station_name: Some("vibe".to_string()),
3350 model: Some("gpt-bound".to_string()),
3351 reasoning_effort: Some("high".to_string()),
3352 service_tier: Some("priority".to_string()),
3353 continuity_mode: SessionContinuityMode::DefaultProfile,
3354 created_at_ms: 1,
3355 updated_at_ms: 1,
3356 last_seen_ms: 10,
3357 },
3358 )]);
3359
3360 let cards = build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
3361 active: &active,
3362 recent: &[],
3363 overrides: &HashMap::new(),
3364 station_overrides: &HashMap::new(),
3365 model_overrides: &HashMap::new(),
3366 service_tier_overrides: &HashMap::new(),
3367 bindings: &bindings,
3368 route_affinities: &HashMap::new(),
3369 global_station_override: None,
3370 stats: &HashMap::new(),
3371 });
3372
3373 assert_eq!(cards[0].binding_profile_name.as_deref(), Some("daily"));
3374 assert_eq!(
3375 cards[0].observation_scope,
3376 SessionObservationScope::ObservedOnly
3377 );
3378 assert_eq!(
3379 cards[0].binding_continuity_mode,
3380 Some(SessionContinuityMode::DefaultProfile)
3381 );
3382 assert_eq!(
3383 cards[0]
3384 .effective_model
3385 .as_ref()
3386 .map(|value| (value.value.as_str(), value.source)),
3387 Some(("gpt-bound", RouteValueSource::ProfileDefault))
3388 );
3389 assert_eq!(
3390 cards[0]
3391 .effective_reasoning_effort
3392 .as_ref()
3393 .map(|value| (value.value.as_str(), value.source)),
3394 Some(("high", RouteValueSource::ProfileDefault))
3395 );
3396 assert_eq!(
3397 cards[0]
3398 .effective_service_tier
3399 .as_ref()
3400 .map(|value| (value.value.as_str(), value.source)),
3401 Some(("priority", RouteValueSource::ProfileDefault))
3402 );
3403 assert_eq!(
3404 cards[0]
3405 .effective_station
3406 .as_ref()
3407 .map(|value| (value.value.as_str(), value.source)),
3408 Some(("vibe", RouteValueSource::ProfileDefault))
3409 );
3410 }
3411
3412 #[test]
3413 fn build_session_identity_cards_keeps_binding_values_but_allows_global_config_override() {
3414 let active = vec![ActiveRequest {
3415 id: 1,
3416 trace_id: Some("codex-1".to_string()),
3417 session_id: Some("sid-bound".to_string()),
3418 client_name: Some("Workstation".to_string()),
3419 client_addr: Some("100.64.0.10".to_string()),
3420 cwd: None,
3421 model: Some("gpt-observed".to_string()),
3422 reasoning_effort: Some("medium".to_string()),
3423 service_tier: Some("default".to_string()),
3424 station_name: Some("vibe".to_string()),
3425 provider_id: None,
3426 upstream_base_url: Some("https://vibe.example/v1".to_string()),
3427 route_decision: None,
3428 service: "codex".to_string(),
3429 method: "POST".to_string(),
3430 path: "/v1/responses".to_string(),
3431 started_at_ms: 10,
3432 }];
3433 let bindings = HashMap::from([(
3434 "sid-bound".to_string(),
3435 SessionBinding {
3436 session_id: "sid-bound".to_string(),
3437 profile_name: Some("daily".to_string()),
3438 station_name: Some("vibe".to_string()),
3439 model: Some("gpt-bound".to_string()),
3440 reasoning_effort: Some("high".to_string()),
3441 service_tier: Some("priority".to_string()),
3442 continuity_mode: SessionContinuityMode::DefaultProfile,
3443 created_at_ms: 1,
3444 updated_at_ms: 1,
3445 last_seen_ms: 10,
3446 },
3447 )]);
3448
3449 let cards = build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
3450 active: &active,
3451 recent: &[],
3452 overrides: &HashMap::new(),
3453 station_overrides: &HashMap::new(),
3454 model_overrides: &HashMap::new(),
3455 service_tier_overrides: &HashMap::new(),
3456 bindings: &bindings,
3457 route_affinities: &HashMap::new(),
3458 global_station_override: Some("right"),
3459 stats: &HashMap::new(),
3460 });
3461
3462 assert_eq!(cards[0].binding_profile_name.as_deref(), Some("daily"));
3463 assert_eq!(
3464 cards[0]
3465 .effective_model
3466 .as_ref()
3467 .map(|value| (value.value.as_str(), value.source)),
3468 Some(("gpt-bound", RouteValueSource::ProfileDefault))
3469 );
3470 assert_eq!(
3471 cards[0]
3472 .effective_reasoning_effort
3473 .as_ref()
3474 .map(|value| (value.value.as_str(), value.source)),
3475 Some(("high", RouteValueSource::ProfileDefault))
3476 );
3477 assert_eq!(
3478 cards[0]
3479 .effective_service_tier
3480 .as_ref()
3481 .map(|value| (value.value.as_str(), value.source)),
3482 Some(("priority", RouteValueSource::ProfileDefault))
3483 );
3484 assert_eq!(
3485 cards[0]
3486 .effective_station
3487 .as_ref()
3488 .map(|value| (value.value.as_str(), value.source)),
3489 Some(("right", RouteValueSource::GlobalOverride))
3490 );
3491 assert!(cards[0].effective_upstream_base_url.is_none());
3492 }
3493
3494 #[test]
3495 fn enrich_session_identity_cards_with_runtime_applies_station_mapping_and_single_upstream() {
3496 let mut cards = vec![SessionIdentityCard {
3497 session_id: Some("sid-1".to_string()),
3498 last_model: Some("gpt-5.4".to_string()),
3499 last_station_name: Some("right".to_string()),
3500 last_upstream_base_url: Some("https://right.example/v1".to_string()),
3501 effective_model: Some(ResolvedRouteValue::new(
3502 "gpt-5.4",
3503 RouteValueSource::RequestPayload,
3504 )),
3505 effective_station: Some(ResolvedRouteValue::new(
3506 "right",
3507 RouteValueSource::RuntimeFallback,
3508 )),
3509 ..SessionIdentityCard::default()
3510 }];
3511
3512 let mut mgr = ServiceConfigManager {
3513 active: Some("right".to_string()),
3514 ..ServiceConfigManager::default()
3515 };
3516 mgr.configs.insert(
3517 "right".to_string(),
3518 ServiceConfig {
3519 name: "right".to_string(),
3520 alias: None,
3521 enabled: true,
3522 level: 1,
3523 upstreams: vec![UpstreamConfig {
3524 base_url: "https://right.example/v1".to_string(),
3525 auth: UpstreamAuth::default(),
3526 tags: HashMap::new(),
3527 supported_models: HashMap::new(),
3528 model_mapping: HashMap::from([(
3529 "gpt-5.4".to_string(),
3530 "gpt-5.4-fast".to_string(),
3531 )]),
3532 }],
3533 },
3534 );
3535
3536 enrich_session_identity_cards_with_runtime(&mut cards, &mgr);
3537
3538 assert_eq!(
3539 cards[0]
3540 .effective_model
3541 .as_ref()
3542 .map(|value| value.value.as_str()),
3543 Some("gpt-5.4-fast")
3544 );
3545 assert_eq!(
3546 cards[0].effective_model.as_ref().map(|value| value.source),
3547 Some(RouteValueSource::StationMapping)
3548 );
3549 assert_eq!(
3550 cards[0]
3551 .effective_upstream_base_url
3552 .as_ref()
3553 .map(|value| value.value.as_str()),
3554 Some("https://right.example/v1")
3555 );
3556 }
3557
3558 #[test]
3559 fn apply_session_profile_binding_sets_binding_and_clears_manual_overrides() {
3560 let runtime = tokio::runtime::Runtime::new().expect("runtime");
3561 runtime.block_on(async {
3562 let state = ProxyState::new();
3563 let now_ms = 42;
3564 let mut mgr = ServiceConfigManager::default();
3565 mgr.configs.insert(
3566 "right".to_string(),
3567 ServiceConfig {
3568 name: "right".to_string(),
3569 alias: None,
3570 enabled: true,
3571 level: 1,
3572 upstreams: vec![UpstreamConfig {
3573 base_url: "https://right.example/v1".to_string(),
3574 auth: UpstreamAuth::default(),
3575 tags: HashMap::from([
3576 ("supports_reasoning_effort".to_string(), "true".to_string()),
3577 ("supports_service_tier".to_string(), "true".to_string()),
3578 ]),
3579 supported_models: HashMap::from([("gpt-5.4".to_string(), true)]),
3580 model_mapping: HashMap::new(),
3581 }],
3582 },
3583 );
3584 mgr.profiles.insert(
3585 "fast".to_string(),
3586 crate::config::ServiceControlProfile {
3587 extends: None,
3588 station: Some("right".to_string()),
3589 model: Some("gpt-5.4".to_string()),
3590 reasoning_effort: Some("low".to_string()),
3591 service_tier: Some("flex".to_string()),
3592 },
3593 );
3594
3595 state
3596 .set_session_station_override("sid-1".to_string(), "other".to_string(), 1)
3597 .await;
3598 state
3599 .set_session_model_override("sid-1".to_string(), "gpt-x".to_string(), 1)
3600 .await;
3601 state
3602 .set_session_effort_override("sid-1".to_string(), "high".to_string(), 1)
3603 .await;
3604 state
3605 .set_session_service_tier_override("sid-1".to_string(), "priority".to_string(), 1)
3606 .await;
3607
3608 state
3609 .apply_session_profile_binding(
3610 "codex",
3611 &mgr,
3612 "sid-1".to_string(),
3613 "fast".to_string(),
3614 now_ms,
3615 )
3616 .await
3617 .expect("apply profile");
3618
3619 let binding = state
3620 .get_session_binding("sid-1")
3621 .await
3622 .expect("binding exists");
3623 assert_eq!(binding.profile_name.as_deref(), Some("fast"));
3624 assert_eq!(binding.station_name.as_deref(), Some("right"));
3625 assert_eq!(binding.model.as_deref(), Some("gpt-5.4"));
3626 assert_eq!(binding.reasoning_effort.as_deref(), Some("low"));
3627 assert_eq!(binding.service_tier.as_deref(), Some("flex"));
3628 assert_eq!(
3629 binding.continuity_mode,
3630 SessionContinuityMode::ManualProfile
3631 );
3632 assert_eq!(binding.updated_at_ms, now_ms);
3633 assert!(state.get_session_station_override("sid-1").await.is_none());
3634 assert!(state.get_session_model_override("sid-1").await.is_none());
3635 assert!(state.get_session_effort_override("sid-1").await.is_none());
3636 assert!(
3637 state
3638 .get_session_service_tier_override("sid-1")
3639 .await
3640 .is_none()
3641 );
3642 });
3643 }
3644
3645 #[test]
3646 fn list_session_manual_overrides_merges_all_dimensions() {
3647 let runtime = tokio::runtime::Runtime::new().expect("runtime");
3648 runtime.block_on(async {
3649 let state = ProxyState::new();
3650 state
3651 .set_session_reasoning_effort_override("sid-1".to_string(), "high".to_string(), 1)
3652 .await;
3653 state
3654 .set_session_station_override("sid-1".to_string(), "right".to_string(), 1)
3655 .await;
3656 state
3657 .set_session_model_override("sid-1".to_string(), "gpt-5.4".to_string(), 1)
3658 .await;
3659 state
3660 .set_session_service_tier_override("sid-1".to_string(), "priority".to_string(), 1)
3661 .await;
3662 state
3663 .set_session_model_override("sid-2".to_string(), "gpt-5.4-mini".to_string(), 2)
3664 .await;
3665
3666 let merged = state.list_session_manual_overrides().await;
3667 assert_eq!(merged.len(), 2);
3668 assert_eq!(
3669 merged
3670 .get("sid-1")
3671 .and_then(|overrides| overrides.reasoning_effort.as_deref()),
3672 Some("high")
3673 );
3674 assert_eq!(
3675 merged
3676 .get("sid-1")
3677 .and_then(|overrides| overrides.station_name.as_deref()),
3678 Some("right")
3679 );
3680 assert_eq!(
3681 merged
3682 .get("sid-1")
3683 .and_then(|overrides| overrides.model.as_deref()),
3684 Some("gpt-5.4")
3685 );
3686 assert_eq!(
3687 merged
3688 .get("sid-1")
3689 .and_then(|overrides| overrides.service_tier.as_deref()),
3690 Some("priority")
3691 );
3692 assert_eq!(
3693 merged
3694 .get("sid-2")
3695 .and_then(|overrides| overrides.model.as_deref()),
3696 Some("gpt-5.4-mini")
3697 );
3698 assert!(
3699 merged
3700 .get("sid-2")
3701 .is_some_and(|overrides| overrides.reasoning_effort.is_none())
3702 );
3703 });
3704 }
3705
3706 #[test]
3707 fn provider_balance_snapshots_are_recorded_and_refreshed() {
3708 let runtime = tokio::runtime::Runtime::new().expect("runtime");
3709 runtime.block_on(async {
3710 let state = ProxyState::new();
3711 state
3712 .record_provider_balance_snapshot(
3713 "codex",
3714 ProviderBalanceSnapshot {
3715 provider_id: "packycode".to_string(),
3716 station_name: Some("right".to_string()),
3717 upstream_index: Some(2),
3718 source: "usage_provider:budget_http_json".to_string(),
3719 fetched_at_ms: 10,
3720 stale_after_ms: Some(0),
3721 stale: false,
3722 status: BalanceSnapshotStatus::Unknown,
3723 exhausted: Some(false),
3724 exhaustion_affects_routing: true,
3725 plan_name: None,
3726 total_balance_usd: Some("3.5".to_string()),
3727 subscription_balance_usd: None,
3728 paygo_balance_usd: None,
3729 monthly_budget_usd: Some("5".to_string()),
3730 monthly_spent_usd: Some("1.5".to_string()),
3731 quota_period: None,
3732 quota_remaining_usd: None,
3733 quota_limit_usd: None,
3734 quota_used_usd: None,
3735 unlimited_quota: None,
3736 total_used_usd: None,
3737 today_used_usd: None,
3738 total_requests: None,
3739 today_requests: None,
3740 total_tokens: None,
3741 today_tokens: None,
3742 error: None,
3743 },
3744 )
3745 .await;
3746
3747 let view = state.get_provider_balance_view("codex").await;
3748 let balances = view.get("right").expect("station balance");
3749 assert_eq!(balances.len(), 1);
3750 assert_eq!(balances[0].provider_id, "packycode");
3751 assert_eq!(balances[0].status, BalanceSnapshotStatus::Stale);
3752 assert_eq!(balances[0].exhausted, Some(false));
3753
3754 let summary = state
3755 .get_provider_balance_summary_view("codex")
3756 .await
3757 .get("right")
3758 .cloned()
3759 .expect("station balance summary");
3760 assert_eq!(summary.snapshots, 1);
3761 assert_eq!(summary.stale, 1);
3762 assert_eq!(summary.routing_snapshots, 1);
3763 });
3764 }
3765
3766 #[test]
3767 fn provider_balance_snapshots_keep_multiple_providers_per_upstream() {
3768 let runtime = tokio::runtime::Runtime::new().expect("runtime");
3769 runtime.block_on(async {
3770 let state = ProxyState::new();
3771 for (provider_id, status) in [
3772 ("general", BalanceSnapshotStatus::Error),
3773 ("newapi", BalanceSnapshotStatus::Ok),
3774 ] {
3775 state
3776 .record_provider_balance_snapshot(
3777 "codex",
3778 ProviderBalanceSnapshot {
3779 provider_id: provider_id.to_string(),
3780 station_name: Some("routing".to_string()),
3781 upstream_index: Some(1),
3782 source: "usage_provider:test".to_string(),
3783 fetched_at_ms: 10,
3784 stale_after_ms: None,
3785 stale: false,
3786 status,
3787 exhausted: if status == BalanceSnapshotStatus::Ok {
3788 Some(false)
3789 } else {
3790 None
3791 },
3792 exhaustion_affects_routing: true,
3793 plan_name: None,
3794 total_balance_usd: if status == BalanceSnapshotStatus::Ok {
3795 Some("3.5".to_string())
3796 } else {
3797 None
3798 },
3799 subscription_balance_usd: None,
3800 paygo_balance_usd: None,
3801 monthly_budget_usd: None,
3802 monthly_spent_usd: None,
3803 quota_period: None,
3804 quota_remaining_usd: None,
3805 quota_limit_usd: None,
3806 quota_used_usd: None,
3807 unlimited_quota: None,
3808 total_used_usd: None,
3809 today_used_usd: None,
3810 total_requests: None,
3811 today_requests: None,
3812 total_tokens: None,
3813 today_tokens: None,
3814 error: if status == BalanceSnapshotStatus::Error {
3815 Some("decode failed".to_string())
3816 } else {
3817 None
3818 },
3819 },
3820 )
3821 .await;
3822 }
3823
3824 let view = state.get_provider_balance_view("codex").await;
3825 let balances = view.get("routing").expect("station balance");
3826 assert_eq!(balances.len(), 2);
3827 assert_eq!(
3828 balances
3829 .iter()
3830 .map(|snapshot| snapshot.provider_id.as_str())
3831 .collect::<Vec<_>>(),
3832 vec!["general", "newapi"]
3833 );
3834 });
3835 }
3836
3837 #[test]
3838 fn prune_runtime_observability_keeps_catalog_provider_balances_on_routing_layout_change() {
3839 let runtime = tokio::runtime::Runtime::new().expect("runtime");
3840 runtime.block_on(async {
3841 let state = ProxyState::new();
3842
3843 let mut initial_mgr = ServiceConfigManager::default();
3844 initial_mgr.configs.insert(
3845 "routing".to_string(),
3846 ServiceConfig {
3847 name: "routing".to_string(),
3848 alias: None,
3849 enabled: true,
3850 level: 1,
3851 upstreams: vec![
3852 UpstreamConfig {
3853 base_url: "https://input.example/v1".to_string(),
3854 auth: UpstreamAuth::default(),
3855 tags: HashMap::from([("provider_id".to_string(), "input".to_string())]),
3856 supported_models: HashMap::new(),
3857 model_mapping: HashMap::new(),
3858 },
3859 UpstreamConfig {
3860 base_url: "https://backup.example/v1".to_string(),
3861 auth: UpstreamAuth::default(),
3862 tags: HashMap::from([(
3863 "provider_id".to_string(),
3864 "backup".to_string(),
3865 )]),
3866 supported_models: HashMap::new(),
3867 model_mapping: HashMap::new(),
3868 },
3869 ],
3870 },
3871 );
3872 state
3873 .prune_runtime_observability_for_service("codex", &initial_mgr)
3874 .await;
3875
3876 state
3877 .record_provider_balance_snapshot(
3878 "codex",
3879 ProviderBalanceSnapshot {
3880 provider_id: "input".to_string(),
3881 station_name: Some("input".to_string()),
3882 upstream_index: Some(0),
3883 source: "usage_provider:test".to_string(),
3884 fetched_at_ms: 10,
3885 stale_after_ms: None,
3886 stale: false,
3887 status: BalanceSnapshotStatus::Ok,
3888 exhausted: Some(false),
3889 exhaustion_affects_routing: true,
3890 total_balance_usd: Some("3.5".to_string()),
3891 ..ProviderBalanceSnapshot::default()
3892 },
3893 )
3894 .await;
3895 state
3896 .record_provider_balance_snapshot(
3897 "codex",
3898 ProviderBalanceSnapshot {
3899 provider_id: "input".to_string(),
3900 station_name: Some("routing".to_string()),
3901 upstream_index: Some(0),
3902 source: "usage_provider:test".to_string(),
3903 fetched_at_ms: 10,
3904 stale_after_ms: None,
3905 stale: false,
3906 status: BalanceSnapshotStatus::Ok,
3907 exhausted: Some(false),
3908 exhaustion_affects_routing: true,
3909 total_balance_usd: Some("3.5".to_string()),
3910 ..ProviderBalanceSnapshot::default()
3911 },
3912 )
3913 .await;
3914
3915 let mut pinned_mgr = ServiceConfigManager::default();
3916 pinned_mgr.configs.insert(
3917 "routing".to_string(),
3918 ServiceConfig {
3919 name: "routing".to_string(),
3920 alias: None,
3921 enabled: true,
3922 level: 1,
3923 upstreams: vec![UpstreamConfig {
3924 base_url: "https://input.example/v1".to_string(),
3925 auth: UpstreamAuth::default(),
3926 tags: HashMap::from([("provider_id".to_string(), "input".to_string())]),
3927 supported_models: HashMap::new(),
3928 model_mapping: HashMap::new(),
3929 }],
3930 },
3931 );
3932 state
3933 .prune_runtime_observability_for_service("codex", &pinned_mgr)
3934 .await;
3935
3936 let view = state.get_provider_balance_view("codex").await;
3937 assert!(view.contains_key("input"));
3938 assert!(!view.contains_key("routing"));
3939 });
3940 }
3941
3942 #[test]
3943 fn prune_runtime_observability_removes_stale_service_keys() {
3944 let runtime = tokio::runtime::Runtime::new().expect("runtime");
3945 runtime.block_on(async {
3946 let state = ProxyState::new();
3947
3948 state
3949 .set_station_enabled_override("codex", "old".to_string(), false, 1)
3950 .await;
3951 state
3952 .set_upstream_enabled_override(
3953 "codex",
3954 "https://old.example/v1".to_string(),
3955 false,
3956 1,
3957 )
3958 .await;
3959 state
3960 .set_provider_endpoint_enabled_override(
3961 "codex",
3962 ProviderEndpointKey::new("codex", "provider-old", "default"),
3963 false,
3964 1,
3965 )
3966 .await;
3967 state
3968 .set_provider_endpoint_runtime_state_override(
3969 "codex",
3970 ProviderEndpointKey::new("codex", "provider-old", "default"),
3971 RuntimeConfigState::BreakerOpen,
3972 1,
3973 )
3974 .await;
3975 state
3976 .record_station_health(
3977 "codex",
3978 "old".to_string(),
3979 StationHealth {
3980 checked_at_ms: 10,
3981 upstreams: vec![UpstreamHealth {
3982 base_url: "https://old.example/v1".to_string(),
3983 ok: Some(false),
3984 status_code: Some(500),
3985 latency_ms: Some(10),
3986 error: Some("boom".to_string()),
3987 passive: None,
3988 }],
3989 },
3990 )
3991 .await;
3992 state
3993 .record_passive_upstream_failure(PassiveUpstreamFailureRecord {
3994 service_name: "codex".to_string(),
3995 station_name: "old".to_string(),
3996 base_url: "https://old.example/v1".to_string(),
3997 status_code: Some(500),
3998 error_class: Some("upstream_transport_error".to_string()),
3999 error: Some("boom".to_string()),
4000 now_ms: 20,
4001 })
4002 .await;
4003 state
4004 .record_provider_balance_snapshot(
4005 "codex",
4006 ProviderBalanceSnapshot {
4007 provider_id: "provider-old".to_string(),
4008 station_name: Some("old".to_string()),
4009 upstream_index: Some(0),
4010 source: "usage_provider:budget_http_json".to_string(),
4011 fetched_at_ms: 10,
4012 stale_after_ms: None,
4013 stale: false,
4014 status: BalanceSnapshotStatus::Ok,
4015 exhausted: Some(false),
4016 exhaustion_affects_routing: true,
4017 plan_name: None,
4018 total_balance_usd: Some("3.5".to_string()),
4019 subscription_balance_usd: None,
4020 paygo_balance_usd: None,
4021 monthly_budget_usd: None,
4022 monthly_spent_usd: None,
4023 quota_period: None,
4024 quota_remaining_usd: None,
4025 quota_limit_usd: None,
4026 quota_used_usd: None,
4027 unlimited_quota: None,
4028 total_used_usd: None,
4029 today_used_usd: None,
4030 total_requests: None,
4031 today_requests: None,
4032 total_tokens: None,
4033 today_tokens: None,
4034 error: None,
4035 },
4036 )
4037 .await;
4038
4039 let request_id = state
4040 .begin_request(
4041 "codex",
4042 "POST",
4043 "/v1/responses",
4044 Some("sid-old".to_string()),
4045 None,
4046 None,
4047 None,
4048 Some("gpt-5".to_string()),
4049 None,
4050 None,
4051 30,
4052 )
4053 .await;
4054 state
4055 .update_request_route(
4056 request_id,
4057 Some("old".to_string()),
4058 Some("provider-old".to_string()),
4059 "https://old.example/v1".to_string(),
4060 None,
4061 )
4062 .await;
4063 state
4064 .finish_request(FinishRequestParams {
4065 id: request_id,
4066 status_code: 200,
4067 duration_ms: 5,
4068 ended_at_ms: 35,
4069 observed_service_tier: None,
4070 usage: None,
4071 retry: None,
4072 ttfb_ms: None,
4073 streaming: false,
4074 })
4075 .await;
4076
4077 let mut mgr = ServiceConfigManager::default();
4078 mgr.configs.insert(
4079 "new".to_string(),
4080 ServiceConfig {
4081 name: "new".to_string(),
4082 alias: None,
4083 enabled: true,
4084 level: 1,
4085 upstreams: vec![UpstreamConfig {
4086 base_url: "https://new.example/v1".to_string(),
4087 auth: UpstreamAuth::default(),
4088 tags: HashMap::from([(
4089 "provider_id".to_string(),
4090 "provider-new".to_string(),
4091 )]),
4092 supported_models: HashMap::new(),
4093 model_mapping: HashMap::new(),
4094 }],
4095 },
4096 );
4097
4098 state
4099 .prune_runtime_observability_for_service("codex", &mgr)
4100 .await;
4101
4102 assert!(state.get_station_meta_overrides("codex").await.is_empty());
4103 assert!(state.get_upstream_meta_overrides("codex").await.is_empty());
4104 assert!(state.get_station_health("codex").await.is_empty());
4105 assert!(state.get_provider_balance_view("codex").await.is_empty());
4106
4107 let rollup = state.get_usage_rollup_view("codex", 10, 10).await;
4108 assert!(rollup.by_config.is_empty());
4109 assert!(rollup.by_provider.is_empty());
4110 });
4111 }
4112
4113 #[test]
4114 fn get_upstream_meta_overrides_merges_endpoint_and_legacy_base_url_entries() {
4115 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4116 runtime.block_on(async {
4117 let state = ProxyState::new();
4118
4119 state
4120 .set_provider_endpoint_enabled_override(
4121 "codex",
4122 ProviderEndpointKey::new("codex", "alpha", "default"),
4123 false,
4124 1,
4125 )
4126 .await;
4127 state
4128 .set_provider_endpoint_runtime_state_override(
4129 "codex",
4130 ProviderEndpointKey::new("codex", "alpha", "default"),
4131 RuntimeConfigState::BreakerOpen,
4132 2,
4133 )
4134 .await;
4135 state
4136 .set_upstream_enabled_override(
4137 "codex",
4138 "https://legacy.example/v1".to_string(),
4139 true,
4140 3,
4141 )
4142 .await;
4143 state
4144 .set_upstream_runtime_state_override(
4145 "codex",
4146 "https://legacy.example/v1".to_string(),
4147 RuntimeConfigState::Draining,
4148 4,
4149 )
4150 .await;
4151
4152 let overrides = state.get_upstream_meta_overrides("codex").await;
4153
4154 assert_eq!(
4155 overrides.get("codex/alpha/default"),
4156 Some(&(Some(false), Some(RuntimeConfigState::BreakerOpen)))
4157 );
4158 assert_eq!(
4159 overrides.get("https://legacy.example/v1"),
4160 Some(&(Some(true), Some(RuntimeConfigState::Draining)))
4161 );
4162 });
4163 }
4164
4165 #[test]
4166 fn provider_endpoint_runtime_health_is_keyed_by_provider_endpoint() {
4167 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4168 runtime.block_on(async {
4169 let state = ProxyState::new();
4170 let monthly = ProviderEndpointKey::new("codex", "monthly", "default");
4171 let fallback = ProviderEndpointKey::new("codex", "fallback", "default");
4172
4173 state
4174 .record_provider_endpoint_attempt_success("codex", fallback.clone(), 10)
4175 .await;
4176 state
4177 .set_provider_endpoint_usage_exhausted("codex", monthly.clone(), true)
4178 .await;
4179 state
4180 .record_provider_endpoint_attempt_failure(
4181 "codex",
4182 monthly.clone(),
4183 0,
4184 crate::lb::CooldownBackoff {
4185 factor: 1,
4186 max_secs: 0,
4187 },
4188 )
4189 .await;
4190 state
4191 .record_provider_endpoint_attempt_failure(
4192 "codex",
4193 monthly.clone(),
4194 0,
4195 crate::lb::CooldownBackoff {
4196 factor: 1,
4197 max_secs: 0,
4198 },
4199 )
4200 .await;
4201 state
4202 .record_provider_endpoint_attempt_failure(
4203 "codex",
4204 monthly.clone(),
4205 30,
4206 crate::lb::CooldownBackoff {
4207 factor: 1,
4208 max_secs: 0,
4209 },
4210 )
4211 .await;
4212
4213 let runtime = state
4214 .route_plan_runtime_state_for_provider_endpoints("codex")
4215 .await;
4216 let monthly_state = runtime.provider_endpoint(&monthly);
4217 assert_eq!(monthly_state.failure_count, crate::lb::FAILURE_THRESHOLD);
4218 assert!(monthly_state.cooldown_active);
4219 assert!(monthly_state.usage_exhausted);
4220 assert_eq!(runtime.affinity_provider_endpoint(), Some(&fallback));
4221
4222 state
4223 .set_provider_endpoint_runtime_state_override(
4224 "codex",
4225 fallback.clone(),
4226 RuntimeConfigState::BreakerOpen,
4227 20,
4228 )
4229 .await;
4230 let runtime = state
4231 .route_plan_runtime_state_for_provider_endpoints("codex")
4232 .await;
4233 assert!(runtime.provider_endpoint(&fallback).runtime_disabled);
4234 });
4235 }
4236
4237 #[test]
4238 fn get_station_health_merges_passive_runtime_observations() {
4239 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4240 runtime.block_on(async {
4241 let state = ProxyState::new();
4242 state
4243 .record_station_health(
4244 "codex",
4245 "right".to_string(),
4246 StationHealth {
4247 checked_at_ms: 10,
4248 upstreams: vec![UpstreamHealth {
4249 base_url: "https://right.example/v1".to_string(),
4250 ok: Some(true),
4251 status_code: Some(200),
4252 latency_ms: Some(120),
4253 error: None,
4254 passive: None,
4255 }],
4256 },
4257 )
4258 .await;
4259 state
4260 .record_passive_upstream_failure(PassiveUpstreamFailureRecord {
4261 service_name: "codex".to_string(),
4262 station_name: "right".to_string(),
4263 base_url: "https://right.example/v1".to_string(),
4264 status_code: Some(500),
4265 error_class: Some("cloudflare_timeout".to_string()),
4266 error: Some("upstream timed out".to_string()),
4267 now_ms: 20,
4268 })
4269 .await;
4270 state
4271 .record_passive_upstream_success(
4272 "codex",
4273 "right",
4274 "https://backup.example/v1",
4275 Some(200),
4276 30,
4277 )
4278 .await;
4279
4280 let health = state.get_station_health("codex").await;
4281 let right = health.get("right").expect("right health");
4282 assert_eq!(right.checked_at_ms, 30);
4283 assert_eq!(right.upstreams.len(), 2);
4284
4285 let primary = right
4286 .upstreams
4287 .iter()
4288 .find(|upstream| upstream.base_url == "https://right.example/v1")
4289 .expect("primary upstream");
4290 assert_eq!(primary.ok, Some(true));
4291 let primary_passive = primary.passive.as_ref().expect("primary passive");
4292 assert_eq!(primary_passive.state, PassiveHealthState::Degraded);
4293 assert_eq!(primary_passive.score, 50);
4294 assert_eq!(primary_passive.last_status_code, Some(500));
4295 assert_eq!(
4296 primary_passive.last_error_class.as_deref(),
4297 Some("cloudflare_timeout")
4298 );
4299
4300 let backup = right
4301 .upstreams
4302 .iter()
4303 .find(|upstream| upstream.base_url == "https://backup.example/v1")
4304 .expect("backup upstream");
4305 assert_eq!(backup.ok, None);
4306 let backup_passive = backup.passive.as_ref().expect("backup passive");
4307 assert_eq!(backup_passive.state, PassiveHealthState::Healthy);
4308 assert_eq!(backup_passive.score, 100);
4309 assert_eq!(backup_passive.last_status_code, Some(200));
4310 });
4311 }
4312
4313 #[test]
4314 fn passive_health_success_recovers_after_failure() {
4315 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4316 runtime.block_on(async {
4317 let state = ProxyState::new();
4318 state
4319 .record_passive_upstream_failure(PassiveUpstreamFailureRecord {
4320 service_name: "codex".to_string(),
4321 station_name: "right".to_string(),
4322 base_url: "https://right.example/v1".to_string(),
4323 status_code: Some(500),
4324 error_class: Some("cloudflare_timeout".to_string()),
4325 error: Some("upstream timed out".to_string()),
4326 now_ms: 10,
4327 })
4328 .await;
4329 state
4330 .record_passive_upstream_success(
4331 "codex",
4332 "right",
4333 "https://right.example/v1",
4334 Some(200),
4335 20,
4336 )
4337 .await;
4338
4339 let health = state.get_station_health("codex").await;
4340 let right = health.get("right").expect("right health");
4341 let upstream = right.upstreams.first().expect("upstream");
4342 let passive = upstream.passive.as_ref().expect("passive");
4343 assert_eq!(passive.state, PassiveHealthState::Healthy);
4344 assert_eq!(passive.score, 100);
4345 assert_eq!(passive.consecutive_failures, 0);
4346 assert_eq!(passive.last_success_at_ms, Some(20));
4347 assert_eq!(passive.last_failure_at_ms, Some(10));
4348 assert_eq!(passive.last_error_class, None);
4349 assert_eq!(passive.last_error, None);
4350 });
4351 }
4352
4353 #[test]
4354 fn apply_session_profile_binding_uses_inherited_profile_values() {
4355 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4356 runtime.block_on(async {
4357 let state = ProxyState::new();
4358 let mut mgr = ServiceConfigManager::default();
4359 mgr.configs.insert(
4360 "right".to_string(),
4361 ServiceConfig {
4362 name: "right".to_string(),
4363 alias: None,
4364 enabled: true,
4365 level: 1,
4366 upstreams: vec![UpstreamConfig {
4367 base_url: "https://right.example/v1".to_string(),
4368 auth: UpstreamAuth::default(),
4369 tags: HashMap::from([
4370 ("supports_reasoning_effort".to_string(), "true".to_string()),
4371 ("supports_service_tier".to_string(), "true".to_string()),
4372 ]),
4373 supported_models: HashMap::from([("gpt-5.4".to_string(), true)]),
4374 model_mapping: HashMap::new(),
4375 }],
4376 },
4377 );
4378 mgr.profiles.insert(
4379 "base".to_string(),
4380 crate::config::ServiceControlProfile {
4381 extends: None,
4382 station: Some("right".to_string()),
4383 model: Some("gpt-5.4".to_string()),
4384 reasoning_effort: None,
4385 service_tier: Some("priority".to_string()),
4386 },
4387 );
4388 mgr.profiles.insert(
4389 "fast".to_string(),
4390 crate::config::ServiceControlProfile {
4391 extends: Some("base".to_string()),
4392 station: None,
4393 model: None,
4394 reasoning_effort: Some("low".to_string()),
4395 service_tier: None,
4396 },
4397 );
4398
4399 state
4400 .apply_session_profile_binding(
4401 "codex",
4402 &mgr,
4403 "sid-inherited".to_string(),
4404 "fast".to_string(),
4405 100,
4406 )
4407 .await
4408 .expect("apply inherited profile");
4409
4410 let binding = state
4411 .get_session_binding("sid-inherited")
4412 .await
4413 .expect("binding exists");
4414 assert_eq!(binding.profile_name.as_deref(), Some("fast"));
4415 assert_eq!(binding.station_name.as_deref(), Some("right"));
4416 assert_eq!(binding.model.as_deref(), Some("gpt-5.4"));
4417 assert_eq!(binding.reasoning_effort.as_deref(), Some("low"));
4418 assert_eq!(binding.service_tier.as_deref(), Some("priority"));
4419 });
4420 }
4421
4422 #[test]
4423 fn prune_periodic_keeps_sticky_binding_after_manual_override_ttl_expires() {
4424 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4425 runtime.block_on(async {
4426 let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(1, 0, 2_000));
4427 state
4428 .set_session_model_override("sid-sticky".to_string(), "gpt-5.4".to_string(), 0)
4429 .await;
4430 state
4431 .set_session_binding(SessionBinding {
4432 session_id: "sid-sticky".to_string(),
4433 profile_name: Some("daily".to_string()),
4434 station_name: Some("right".to_string()),
4435 model: Some("gpt-5.4".to_string()),
4436 reasoning_effort: Some("medium".to_string()),
4437 service_tier: Some("default".to_string()),
4438 continuity_mode: SessionContinuityMode::DefaultProfile,
4439 created_at_ms: 0,
4440 updated_at_ms: 0,
4441 last_seen_ms: 0,
4442 })
4443 .await;
4444
4445 state.prune_periodic().await;
4446
4447 assert!(
4448 state
4449 .get_session_model_override("sid-sticky")
4450 .await
4451 .is_none()
4452 );
4453 assert!(state.get_session_binding("sid-sticky").await.is_some());
4454 });
4455 }
4456
4457 #[test]
4458 fn prune_periodic_honors_opt_in_binding_ttl() {
4459 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4460 runtime.block_on(async {
4461 let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(0, 1, 2_000));
4462 state
4463 .set_session_binding(SessionBinding {
4464 session_id: "sid-expire".to_string(),
4465 profile_name: Some("daily".to_string()),
4466 station_name: Some("right".to_string()),
4467 model: Some("gpt-5.4".to_string()),
4468 reasoning_effort: Some("medium".to_string()),
4469 service_tier: Some("default".to_string()),
4470 continuity_mode: SessionContinuityMode::DefaultProfile,
4471 created_at_ms: 0,
4472 updated_at_ms: 0,
4473 last_seen_ms: 0,
4474 })
4475 .await;
4476
4477 state.prune_periodic().await;
4478
4479 assert!(state.get_session_binding("sid-expire").await.is_none());
4480 });
4481 }
4482
4483 #[test]
4484 fn session_route_affinity_ttl_is_enforced_on_read() {
4485 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4486 runtime.block_on(async {
4487 let mut policy = test_runtime_policy(0, 0, 2_000);
4488 policy.session_route_affinity_ttl_ms = 1;
4489 let state = ProxyState::new_with_runtime_policy(None, policy);
4490 state
4491 .record_session_route_affinity_success(
4492 "sid-expire",
4493 SessionRouteAffinityTarget {
4494 route_graph_key: "graph".to_string(),
4495 provider_endpoint: ProviderEndpointKey::new("codex", "monthly", "default"),
4496 upstream_base_url: "https://monthly.example/v1".to_string(),
4497 route_path: vec!["monthly_first".to_string(), "monthly".to_string()],
4498 },
4499 Some("first_success".to_string()),
4500 0,
4501 )
4502 .await;
4503
4504 assert!(
4505 state
4506 .get_session_route_affinity("sid-expire")
4507 .await
4508 .is_none()
4509 );
4510 assert!(state.list_session_route_affinities().await.is_empty());
4511 });
4512 }
4513
4514 #[test]
4515 fn prune_periodic_caps_sticky_bindings_by_last_seen() {
4516 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4517 runtime.block_on(async {
4518 let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(0, 0, 2));
4519 for idx in 0..3 {
4520 state
4521 .set_session_binding(SessionBinding {
4522 session_id: format!("sid-{idx}"),
4523 profile_name: Some("daily".to_string()),
4524 station_name: Some("right".to_string()),
4525 model: Some("gpt-5.4".to_string()),
4526 reasoning_effort: Some("medium".to_string()),
4527 service_tier: Some("default".to_string()),
4528 continuity_mode: SessionContinuityMode::DefaultProfile,
4529 created_at_ms: idx,
4530 updated_at_ms: idx,
4531 last_seen_ms: idx,
4532 })
4533 .await;
4534 }
4535
4536 state.prune_periodic().await;
4537
4538 assert!(state.get_session_binding("sid-0").await.is_none());
4539 assert!(state.get_session_binding("sid-1").await.is_some());
4540 assert!(state.get_session_binding("sid-2").await.is_some());
4541 });
4542 }
4543
4544 #[test]
4545 fn transcript_path_cache_records_negative_lookups() {
4546 let runtime = tokio::runtime::Runtime::new().expect("runtime");
4547 runtime.block_on(async {
4548 let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(0, 0, 2_000));
4549 let paths = state
4550 .resolve_host_transcript_paths_cached(&["missing-session".to_string()])
4551 .await;
4552
4553 assert_eq!(paths.get("missing-session"), Some(&None));
4554 let cache = state.session_transcript_path_cache.read().await;
4555 assert!(cache.contains_key("missing-session"));
4556 });
4557 }
4558}