use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{Read, Seek, SeekFrom};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use serde_json::Value as JsonValue;
use tokio::sync::RwLock;
use tokio::time::{Duration, interval};
pub use crate::balance::{
BalanceSnapshotStatus, ProviderBalanceSnapshot, StationRoutingBalanceSummary,
};
use crate::config::ServiceConfigManager;
use crate::lb::{COOLDOWN_SECS, CooldownBackoff, FAILURE_THRESHOLD, LbState};
#[cfg(test)]
use crate::pricing::CostBreakdown;
use crate::pricing::{CostAdjustments, estimate_request_cost_from_operator_catalog_for_service};
use crate::routing_ir::{RoutePlanRuntimeState, RoutePlanUpstreamRuntimeState};
use crate::runtime_identity::ProviderEndpointKey;
use crate::sessions;
use crate::usage::UsageMetrics;
mod runtime_types;
mod session_identity;
use self::runtime_types::{
ConfigMetaOverride, RuntimeDefaultProfileOverride, UsageRollup, merge_station_health,
};
pub use self::runtime_types::{
HealthCheckStatus, LbConfigView, LbUpstreamView, PassiveHealthState, PassiveUpstreamHealth,
RuntimeConfigState, StationHealth, UpstreamHealth, UsageBucket, UsageRollupCoverage,
UsageRollupView,
};
pub use self::session_identity::{
ActiveRequest, FinishRequestParams, FinishedRequest, RequestObservability, ResolvedRouteValue,
RouteDecisionProvenance, RouteValueSource, SessionBinding, SessionContinuityMode,
SessionIdentityCard, SessionIdentityCardBuildInputs, SessionManualOverrides,
SessionObservationScope, SessionRouteAffinity, SessionRouteAffinityTarget, SessionStats,
build_session_identity_cards_from_parts, enrich_session_identity_cards_with_host_transcripts,
enrich_session_identity_cards_with_runtime,
};
use self::session_identity::{
SessionBindingEntry, SessionCwdCacheEntry, SessionEffortOverride, SessionModelOverride,
SessionRouteTargetOverride, SessionServiceTierOverride, SessionStationOverride,
};
type PassiveStationHealthMap =
HashMap<String, HashMap<String, HashMap<String, PassiveUpstreamHealth>>>;
type ProviderBalanceMap =
HashMap<String, HashMap<String, HashMap<usize, HashMap<String, ProviderBalanceSnapshot>>>>;
type ProviderBalanceSummaryMap = HashMap<String, HashMap<String, StationRoutingBalanceSummary>>;
type ServiceLayoutSignature = Vec<(String, Vec<String>)>;
#[derive(Debug, Clone, Default)]
struct ProviderEndpointRuntimeHealth {
failure_count: u32,
cooldown_until: Option<std::time::Instant>,
usage_exhausted: bool,
penalty_streak: u32,
last_good_at_ms: Option<u64>,
}
#[derive(Debug, Clone)]
struct SessionTranscriptPathCacheEntry {
path: Option<String>,
last_checked_ms: u64,
last_seen_ms: u64,
}
#[derive(Debug, Clone, Copy)]
struct RuntimePolicy {
session_override_ttl_ms: u64,
session_binding_ttl_ms: u64,
session_binding_max_entries: usize,
session_route_affinity_ttl_ms: u64,
session_route_affinity_max_entries: usize,
session_cwd_cache_ttl_ms: u64,
session_cwd_cache_max_entries: usize,
session_transcript_path_cache_ttl_ms: u64,
session_transcript_path_cache_max_entries: usize,
}
pub struct PassiveUpstreamFailureRecord {
pub service_name: String,
pub station_name: String,
pub base_url: String,
pub status_code: Option<u16>,
pub error_class: Option<String>,
pub error: Option<String>,
pub now_ms: u64,
}
fn recent_finished_max() -> usize {
static MAX: OnceLock<usize> = OnceLock::new();
*MAX.get_or_init(|| {
std::env::var("CODEX_HELPER_RECENT_FINISHED_MAX")
.ok()
.and_then(|s| s.trim().parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(2_000)
.clamp(200, 20_000)
})
}
fn unix_now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn prune_lru_cache<T>(
cache: &mut HashMap<String, T>,
max_entries: usize,
last_seen: impl Fn(&T) -> u64,
) {
if max_entries == 0 || cache.len() <= max_entries {
return;
}
let mut keys = cache
.iter()
.map(|(key, value)| (key.clone(), last_seen(value)))
.collect::<Vec<_>>();
keys.sort_by_key(|(_, seen)| *seen);
let remove_count = keys.len().saturating_sub(max_entries);
for (key, _) in keys.into_iter().take(remove_count) {
cache.remove(&key);
}
}
fn service_layout_signature(mgr: &ServiceConfigManager) -> ServiceLayoutSignature {
let mut entries = mgr
.stations()
.iter()
.map(|(station_name, service)| {
(
station_name.clone(),
service
.upstreams
.iter()
.map(|upstream| upstream.base_url.clone())
.collect::<Vec<_>>(),
)
})
.collect::<Vec<_>>();
entries.sort_by(|left, right| left.0.cmp(&right.0).then_with(|| left.1.cmp(&right.1)));
entries
}
fn changed_service_layout_stations(
previous: &ServiceLayoutSignature,
current: &ServiceLayoutSignature,
) -> HashSet<String> {
let previous_by_station = previous
.iter()
.map(|(station_name, upstreams)| (station_name.as_str(), upstreams.as_slice()))
.collect::<HashMap<_, _>>();
let current_by_station = current
.iter()
.map(|(station_name, upstreams)| (station_name.as_str(), upstreams.as_slice()))
.collect::<HashMap<_, _>>();
let mut changed = HashSet::new();
for (station_name, upstreams) in previous {
if current_by_station
.get(station_name.as_str())
.is_none_or(|current_upstreams| *current_upstreams != upstreams.as_slice())
{
changed.insert(station_name.clone());
}
}
for (station_name, upstreams) in current {
if previous_by_station
.get(station_name.as_str())
.is_none_or(|previous_upstreams| *previous_upstreams != upstreams.as_slice())
{
changed.insert(station_name.clone());
}
}
changed
}
#[derive(Debug)]
pub struct ProxyState {
next_request_id: AtomicU64,
session_override_ttl_ms: u64,
session_binding_ttl_ms: u64,
session_binding_max_entries: usize,
session_route_affinity_ttl_ms: u64,
session_route_affinity_max_entries: usize,
session_cwd_cache_ttl_ms: u64,
session_cwd_cache_max_entries: usize,
session_transcript_path_cache_ttl_ms: u64,
session_transcript_path_cache_max_entries: usize,
session_effort_overrides: RwLock<HashMap<String, SessionEffortOverride>>,
session_station_overrides: RwLock<HashMap<String, SessionStationOverride>>,
session_route_target_overrides: RwLock<HashMap<String, SessionRouteTargetOverride>>,
session_model_overrides: RwLock<HashMap<String, SessionModelOverride>>,
session_service_tier_overrides: RwLock<HashMap<String, SessionServiceTierOverride>>,
session_bindings: RwLock<HashMap<String, SessionBindingEntry>>,
session_route_affinities: RwLock<HashMap<String, SessionRouteAffinity>>,
global_station_override: RwLock<Option<String>>,
global_route_target_override: RwLock<Option<String>>,
runtime_default_profiles: RwLock<HashMap<String, RuntimeDefaultProfileOverride>>,
station_meta_overrides: RwLock<HashMap<String, HashMap<String, ConfigMetaOverride>>>,
provider_endpoint_meta_overrides:
RwLock<HashMap<String, HashMap<ProviderEndpointKey, ConfigMetaOverride>>>,
upstream_meta_overrides: RwLock<HashMap<String, HashMap<String, ConfigMetaOverride>>>,
session_cwd_cache: RwLock<HashMap<String, SessionCwdCacheEntry>>,
session_transcript_path_cache: RwLock<HashMap<String, SessionTranscriptPathCacheEntry>>,
session_stats: RwLock<HashMap<String, SessionStats>>,
active_requests: RwLock<HashMap<u64, ActiveRequest>>,
recent_finished: RwLock<VecDeque<FinishedRequest>>,
usage_rollups: RwLock<HashMap<String, UsageRollup>>,
station_health: RwLock<HashMap<String, HashMap<String, StationHealth>>>,
passive_station_health: RwLock<PassiveStationHealthMap>,
provider_balances: RwLock<ProviderBalanceMap>,
provider_balance_summaries: RwLock<ProviderBalanceSummaryMap>,
provider_endpoint_runtime_health:
RwLock<HashMap<String, HashMap<ProviderEndpointKey, ProviderEndpointRuntimeHealth>>>,
station_health_checks: RwLock<HashMap<String, HashMap<String, HealthCheckStatus>>>,
service_layout_signatures: RwLock<HashMap<String, ServiceLayoutSignature>>,
lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
}
impl ProxyState {
const MAX_HEALTH_RECORDS_PER_STATION: usize = 200;
#[allow(dead_code)]
pub fn new() -> Arc<Self> {
Self::new_with_lb_states(None)
}
pub fn new_with_lb_states(
lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
) -> Arc<Self> {
let ttl_secs = std::env::var("CODEX_HELPER_SESSION_OVERRIDE_TTL_SECS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.filter(|&n| n > 0)
.unwrap_or(30 * 60);
let ttl_ms = ttl_secs.saturating_mul(1000);
let binding_ttl_secs = std::env::var("CODEX_HELPER_SESSION_BINDING_TTL_SECS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(0);
let binding_ttl_ms = binding_ttl_secs.saturating_mul(1000);
let binding_max_entries = std::env::var("CODEX_HELPER_SESSION_BINDING_MAX_ENTRIES")
.ok()
.and_then(|s| s.trim().parse::<usize>().ok())
.unwrap_or(2_000);
let route_affinity_ttl_secs = std::env::var("CODEX_HELPER_SESSION_ROUTE_AFFINITY_TTL_SECS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(0);
let route_affinity_ttl_ms = route_affinity_ttl_secs.saturating_mul(1000);
let route_affinity_max_entries =
std::env::var("CODEX_HELPER_SESSION_ROUTE_AFFINITY_MAX_ENTRIES")
.ok()
.and_then(|s| s.trim().parse::<usize>().ok())
.unwrap_or(5_000);
let cwd_cache_ttl_secs = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_TTL_SECS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(12 * 60 * 60);
let cwd_cache_ttl_ms = cwd_cache_ttl_secs.saturating_mul(1000);
let cwd_cache_max_entries = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_MAX_ENTRIES")
.ok()
.and_then(|s| s.trim().parse::<usize>().ok())
.unwrap_or(2_000);
let transcript_path_cache_ttl_secs =
std::env::var("CODEX_HELPER_SESSION_TRANSCRIPT_PATH_CACHE_TTL_SECS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(30);
let transcript_path_cache_ttl_ms = transcript_path_cache_ttl_secs.saturating_mul(1000);
let transcript_path_cache_max_entries =
std::env::var("CODEX_HELPER_SESSION_TRANSCRIPT_PATH_CACHE_MAX_ENTRIES")
.ok()
.and_then(|s| s.trim().parse::<usize>().ok())
.unwrap_or(5_000);
Self::new_with_runtime_policy(
lb_states,
RuntimePolicy {
session_override_ttl_ms: ttl_ms,
session_binding_ttl_ms: binding_ttl_ms,
session_binding_max_entries: binding_max_entries,
session_route_affinity_ttl_ms: route_affinity_ttl_ms,
session_route_affinity_max_entries: route_affinity_max_entries,
session_cwd_cache_ttl_ms: cwd_cache_ttl_ms,
session_cwd_cache_max_entries: cwd_cache_max_entries,
session_transcript_path_cache_ttl_ms: transcript_path_cache_ttl_ms,
session_transcript_path_cache_max_entries: transcript_path_cache_max_entries,
},
)
}
fn new_with_runtime_policy(
lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
policy: RuntimePolicy,
) -> Arc<Self> {
Arc::new(Self {
next_request_id: AtomicU64::new(1),
session_override_ttl_ms: policy.session_override_ttl_ms,
session_binding_ttl_ms: policy.session_binding_ttl_ms,
session_binding_max_entries: policy.session_binding_max_entries,
session_route_affinity_ttl_ms: policy.session_route_affinity_ttl_ms,
session_route_affinity_max_entries: policy.session_route_affinity_max_entries,
session_cwd_cache_ttl_ms: policy.session_cwd_cache_ttl_ms,
session_cwd_cache_max_entries: policy.session_cwd_cache_max_entries,
session_transcript_path_cache_ttl_ms: policy.session_transcript_path_cache_ttl_ms,
session_transcript_path_cache_max_entries: policy
.session_transcript_path_cache_max_entries,
session_effort_overrides: RwLock::new(HashMap::new()),
session_station_overrides: RwLock::new(HashMap::new()),
session_route_target_overrides: RwLock::new(HashMap::new()),
session_model_overrides: RwLock::new(HashMap::new()),
session_service_tier_overrides: RwLock::new(HashMap::new()),
session_bindings: RwLock::new(HashMap::new()),
session_route_affinities: RwLock::new(HashMap::new()),
global_station_override: RwLock::new(None),
global_route_target_override: RwLock::new(None),
runtime_default_profiles: RwLock::new(HashMap::new()),
station_meta_overrides: RwLock::new(HashMap::new()),
provider_endpoint_meta_overrides: RwLock::new(HashMap::new()),
upstream_meta_overrides: RwLock::new(HashMap::new()),
session_cwd_cache: RwLock::new(HashMap::new()),
session_transcript_path_cache: RwLock::new(HashMap::new()),
session_stats: RwLock::new(HashMap::new()),
active_requests: RwLock::new(HashMap::new()),
recent_finished: RwLock::new(VecDeque::new()),
usage_rollups: RwLock::new(HashMap::new()),
station_health: RwLock::new(HashMap::new()),
passive_station_health: RwLock::new(HashMap::new()),
provider_balances: RwLock::new(HashMap::new()),
provider_balance_summaries: RwLock::new(HashMap::new()),
provider_endpoint_runtime_health: RwLock::new(HashMap::new()),
station_health_checks: RwLock::new(HashMap::new()),
service_layout_signatures: RwLock::new(HashMap::new()),
lb_states,
})
}
pub async fn get_session_effort_override(&self, session_id: &str) -> Option<String> {
let guard = self.session_effort_overrides.read().await;
guard.get(session_id).map(|v| v.effort.clone())
}
pub async fn get_session_reasoning_effort_override(&self, session_id: &str) -> Option<String> {
self.get_session_effort_override(session_id).await
}
pub async fn set_session_effort_override(
&self,
session_id: String,
effort: String,
now_ms: u64,
) {
let mut guard = self.session_effort_overrides.write().await;
guard.insert(
session_id,
SessionEffortOverride {
effort,
updated_at_ms: now_ms,
last_seen_ms: now_ms,
},
);
}
pub async fn set_session_reasoning_effort_override(
&self,
session_id: String,
reasoning_effort: String,
now_ms: u64,
) {
self.set_session_effort_override(session_id, reasoning_effort, now_ms)
.await;
}
pub async fn clear_session_effort_override(&self, session_id: &str) {
let mut guard = self.session_effort_overrides.write().await;
guard.remove(session_id);
}
pub async fn clear_session_reasoning_effort_override(&self, session_id: &str) {
self.clear_session_effort_override(session_id).await;
}
pub async fn list_session_effort_overrides(&self) -> HashMap<String, String> {
let guard = self.session_effort_overrides.read().await;
guard
.iter()
.map(|(k, v)| (k.clone(), v.effort.clone()))
.collect()
}
pub async fn list_session_reasoning_effort_overrides(&self) -> HashMap<String, String> {
self.list_session_effort_overrides().await
}
pub async fn touch_session_override(&self, session_id: &str, now_ms: u64) {
let mut guard = self.session_effort_overrides.write().await;
if let Some(v) = guard.get_mut(session_id) {
v.last_seen_ms = now_ms;
}
}
pub async fn touch_session_reasoning_effort_override(&self, session_id: &str, now_ms: u64) {
self.touch_session_override(session_id, now_ms).await;
}
pub async fn get_session_station_override(&self, session_id: &str) -> Option<String> {
let guard = self.session_station_overrides.read().await;
guard.get(session_id).map(|v| v.station_name.clone())
}
pub async fn get_session_route_target_override(&self, session_id: &str) -> Option<String> {
let guard = self.session_route_target_overrides.read().await;
guard.get(session_id).map(|v| v.target.clone())
}
pub async fn get_session_model_override(&self, session_id: &str) -> Option<String> {
let guard = self.session_model_overrides.read().await;
guard.get(session_id).map(|v| v.model.clone())
}
pub async fn set_session_model_override(&self, session_id: String, model: String, now_ms: u64) {
let mut guard = self.session_model_overrides.write().await;
guard.insert(
session_id,
SessionModelOverride {
model,
updated_at_ms: now_ms,
last_seen_ms: now_ms,
},
);
}
pub async fn clear_session_model_override(&self, session_id: &str) {
let mut guard = self.session_model_overrides.write().await;
guard.remove(session_id);
}
pub async fn list_session_model_overrides(&self) -> HashMap<String, String> {
let guard = self.session_model_overrides.read().await;
guard
.iter()
.map(|(k, v)| (k.clone(), v.model.clone()))
.collect()
}
pub async fn touch_session_model_override(&self, session_id: &str, now_ms: u64) {
let mut guard = self.session_model_overrides.write().await;
if let Some(v) = guard.get_mut(session_id) {
v.last_seen_ms = now_ms;
}
}
pub async fn get_session_service_tier_override(&self, session_id: &str) -> Option<String> {
let guard = self.session_service_tier_overrides.read().await;
guard.get(session_id).map(|v| v.service_tier.clone())
}
pub async fn set_session_service_tier_override(
&self,
session_id: String,
service_tier: String,
now_ms: u64,
) {
let mut guard = self.session_service_tier_overrides.write().await;
guard.insert(
session_id,
SessionServiceTierOverride {
service_tier,
updated_at_ms: now_ms,
last_seen_ms: now_ms,
},
);
}
pub async fn clear_session_service_tier_override(&self, session_id: &str) {
let mut guard = self.session_service_tier_overrides.write().await;
guard.remove(session_id);
}
pub async fn list_session_service_tier_overrides(&self) -> HashMap<String, String> {
let guard = self.session_service_tier_overrides.read().await;
guard
.iter()
.map(|(k, v)| (k.clone(), v.service_tier.clone()))
.collect()
}
pub async fn touch_session_service_tier_override(&self, session_id: &str, now_ms: u64) {
let mut guard = self.session_service_tier_overrides.write().await;
if let Some(v) = guard.get_mut(session_id) {
v.last_seen_ms = now_ms;
}
}
pub async fn get_session_binding(&self, session_id: &str) -> Option<SessionBinding> {
let guard = self.session_bindings.read().await;
guard.get(session_id).map(|entry| entry.binding.clone())
}
pub async fn list_session_bindings(&self) -> HashMap<String, SessionBinding> {
let guard = self.session_bindings.read().await;
guard
.iter()
.map(|(sid, entry)| (sid.clone(), entry.binding.clone()))
.collect()
}
pub async fn get_session_route_affinity(
&self,
session_id: &str,
) -> Option<SessionRouteAffinity> {
let mut guard = self.session_route_affinities.write().await;
let affinity = guard.get(session_id).cloned()?;
if self.session_route_affinity_is_expired(&affinity, unix_now_ms()) {
guard.remove(session_id);
return None;
}
Some(affinity)
}
pub async fn list_session_route_affinities(&self) -> HashMap<String, SessionRouteAffinity> {
let mut guard = self.session_route_affinities.write().await;
let now_ms = unix_now_ms();
guard.retain(|_, affinity| !self.session_route_affinity_is_expired(affinity, now_ms));
guard.clone()
}
pub async fn record_session_route_affinity_success(
&self,
session_id: &str,
target: SessionRouteAffinityTarget,
reason_hint: Option<String>,
now_ms: u64,
) -> SessionRouteAffinity {
let mut guard = self.session_route_affinities.write().await;
let reason = match guard.get_mut(session_id) {
Some(existing) if target.same_target(existing) => {
existing.last_selected_at_ms = now_ms;
return existing.clone();
}
Some(_) => reason_hint.unwrap_or_else(|| "target_changed".to_string()),
None => reason_hint.unwrap_or_else(|| "first_success".to_string()),
};
let affinity = SessionRouteAffinity {
route_graph_key: target.route_graph_key,
provider_endpoint: target.provider_endpoint,
upstream_base_url: target.upstream_base_url,
route_path: target.route_path,
last_selected_at_ms: now_ms,
last_changed_at_ms: now_ms,
change_reason: reason,
};
guard.insert(session_id.to_string(), affinity.clone());
prune_lru_cache(
&mut guard,
self.session_route_affinity_max_entries,
|entry| entry.last_selected_at_ms,
);
affinity
}
fn session_route_affinity_is_expired(
&self,
affinity: &SessionRouteAffinity,
now_ms: u64,
) -> bool {
self.session_route_affinity_ttl_ms > 0
&& now_ms.saturating_sub(affinity.last_selected_at_ms)
>= self.session_route_affinity_ttl_ms
}
pub async fn set_session_binding(&self, binding: SessionBinding) {
let mut guard = self.session_bindings.write().await;
let binding = if let Some(existing) = guard.get(binding.session_id.as_str()) {
SessionBinding {
created_at_ms: existing.binding.created_at_ms,
..binding
}
} else {
binding
};
guard.insert(binding.session_id.clone(), SessionBindingEntry { binding });
}
pub async fn clear_session_binding(&self, session_id: &str) {
let mut guard = self.session_bindings.write().await;
guard.remove(session_id);
}
pub async fn clear_session_manual_overrides(&self, session_id: &str) {
self.clear_session_station_override(session_id).await;
self.clear_session_route_target_override(session_id).await;
self.clear_session_model_override(session_id).await;
self.clear_session_effort_override(session_id).await;
self.clear_session_service_tier_override(session_id).await;
}
pub async fn get_session_manual_overrides(&self, session_id: &str) -> SessionManualOverrides {
let (reasoning_effort, station_name, route_target, model, service_tier) = tokio::join!(
self.get_session_reasoning_effort_override(session_id),
self.get_session_station_override(session_id),
self.get_session_route_target_override(session_id),
self.get_session_model_override(session_id),
self.get_session_service_tier_override(session_id),
);
SessionManualOverrides {
reasoning_effort,
station_name,
route_target,
model,
service_tier,
}
}
pub async fn list_session_manual_overrides(&self) -> HashMap<String, SessionManualOverrides> {
let (reasoning_effort_map, station_map, route_target_map, model_map, service_tier_map) = tokio::join!(
self.list_session_reasoning_effort_overrides(),
self.list_session_station_overrides(),
self.list_session_route_target_overrides(),
self.list_session_model_overrides(),
self.list_session_service_tier_overrides(),
);
let mut merged = HashMap::<String, SessionManualOverrides>::new();
for (session_id, reasoning_effort) in reasoning_effort_map {
merged.entry(session_id).or_default().reasoning_effort = Some(reasoning_effort);
}
for (session_id, station_name) in station_map {
merged.entry(session_id).or_default().station_name = Some(station_name);
}
for (session_id, route_target) in route_target_map {
merged.entry(session_id).or_default().route_target = Some(route_target);
}
for (session_id, model) in model_map {
merged.entry(session_id).or_default().model = Some(model);
}
for (session_id, service_tier) in service_tier_map {
merged.entry(session_id).or_default().service_tier = Some(service_tier);
}
merged.retain(|_, overrides| !overrides.is_empty());
merged
}
pub async fn apply_session_profile_binding(
&self,
service_name: &str,
mgr: &ServiceConfigManager,
session_id: String,
profile_name: String,
now_ms: u64,
) -> anyhow::Result<()> {
let profile = crate::config::resolve_service_profile(mgr, profile_name.as_str())?;
crate::config::validate_profile_station_compatibility(
service_name,
mgr,
profile_name.as_str(),
&profile,
)?;
self.set_session_binding(SessionBinding {
session_id: session_id.clone(),
profile_name: Some(profile_name),
station_name: profile.station.clone(),
model: profile.model.clone(),
reasoning_effort: profile.reasoning_effort.clone(),
service_tier: profile.service_tier.clone(),
continuity_mode: SessionContinuityMode::ManualProfile,
created_at_ms: now_ms,
updated_at_ms: now_ms,
last_seen_ms: now_ms,
})
.await;
self.clear_session_manual_overrides(session_id.as_str())
.await;
Ok(())
}
pub async fn touch_session_binding(&self, session_id: &str, now_ms: u64) {
let mut guard = self.session_bindings.write().await;
if let Some(entry) = guard.get_mut(session_id) {
entry.binding.last_seen_ms = now_ms;
}
}
pub async fn set_session_station_override(
&self,
session_id: String,
station_name: String,
now_ms: u64,
) {
let mut guard = self.session_station_overrides.write().await;
guard.insert(
session_id,
SessionStationOverride {
station_name,
updated_at_ms: now_ms,
last_seen_ms: now_ms,
},
);
}
pub async fn set_session_route_target_override(
&self,
session_id: String,
target: String,
now_ms: u64,
) {
let mut guard = self.session_route_target_overrides.write().await;
guard.insert(
session_id,
SessionRouteTargetOverride {
target,
updated_at_ms: now_ms,
last_seen_ms: now_ms,
},
);
}
pub async fn clear_session_station_override(&self, session_id: &str) {
let mut guard = self.session_station_overrides.write().await;
guard.remove(session_id);
}
pub async fn clear_session_route_target_override(&self, session_id: &str) {
let mut guard = self.session_route_target_overrides.write().await;
guard.remove(session_id);
}
pub async fn list_session_station_overrides(&self) -> HashMap<String, String> {
let guard = self.session_station_overrides.read().await;
guard
.iter()
.map(|(k, v)| (k.clone(), v.station_name.clone()))
.collect()
}
pub async fn list_session_route_target_overrides(&self) -> HashMap<String, String> {
let guard = self.session_route_target_overrides.read().await;
guard
.iter()
.map(|(k, v)| (k.clone(), v.target.clone()))
.collect()
}
pub async fn touch_session_station_override(&self, session_id: &str, now_ms: u64) {
let mut guard = self.session_station_overrides.write().await;
if let Some(v) = guard.get_mut(session_id) {
v.last_seen_ms = now_ms;
}
}
pub async fn touch_session_route_target_override(&self, session_id: &str, now_ms: u64) {
let mut guard = self.session_route_target_overrides.write().await;
if let Some(v) = guard.get_mut(session_id) {
v.last_seen_ms = now_ms;
}
}
pub async fn get_global_station_override(&self) -> Option<String> {
let guard = self.global_station_override.read().await;
guard.clone()
}
pub async fn get_global_route_target_override(&self) -> Option<String> {
let guard = self.global_route_target_override.read().await;
guard.clone()
}
pub async fn set_global_station_override(&self, station_name: String, _now_ms: u64) {
let mut guard = self.global_station_override.write().await;
*guard = Some(station_name);
}
pub async fn set_global_route_target_override(&self, target: String, _now_ms: u64) {
let mut guard = self.global_route_target_override.write().await;
*guard = Some(target);
}
pub async fn clear_global_station_override(&self) {
let mut guard = self.global_station_override.write().await;
*guard = None;
}
pub async fn clear_global_route_target_override(&self) {
let mut guard = self.global_route_target_override.write().await;
*guard = None;
}
pub async fn get_runtime_default_profile_override(&self, service_name: &str) -> Option<String> {
let guard = self.runtime_default_profiles.read().await;
guard
.get(service_name)
.map(|entry| entry.profile_name.clone())
}
pub async fn set_runtime_default_profile_override(
&self,
service_name: String,
profile_name: String,
now_ms: u64,
) {
let mut guard = self.runtime_default_profiles.write().await;
guard.insert(
service_name,
RuntimeDefaultProfileOverride {
profile_name,
updated_at_ms: now_ms,
},
);
}
pub async fn clear_runtime_default_profile_override(&self, service_name: &str) {
let mut guard = self.runtime_default_profiles.write().await;
guard.remove(service_name);
}
pub async fn set_station_enabled_override(
&self,
service_name: &str,
station_name: String,
enabled: bool,
now_ms: u64,
) {
let mut guard = self.station_meta_overrides.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let entry = per_service.entry(station_name).or_default();
entry.enabled = Some(enabled);
entry.updated_at_ms = now_ms;
}
pub async fn set_station_level_override(
&self,
service_name: &str,
station_name: String,
level: u8,
now_ms: u64,
) {
let mut guard = self.station_meta_overrides.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let entry = per_service.entry(station_name).or_default();
entry.level = Some(level.clamp(1, 10));
entry.updated_at_ms = now_ms;
}
pub async fn set_station_runtime_state_override(
&self,
service_name: &str,
station_name: String,
state: RuntimeConfigState,
now_ms: u64,
) {
let mut guard = self.station_meta_overrides.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let entry = per_service.entry(station_name).or_default();
entry.state = Some(state);
entry.updated_at_ms = now_ms;
}
pub async fn clear_station_enabled_override(&self, service_name: &str, station_name: &str) {
let mut guard = self.station_meta_overrides.write().await;
let Some(per_service) = guard.get_mut(service_name) else {
return;
};
let Some(entry) = per_service.get_mut(station_name) else {
return;
};
entry.enabled = None;
if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
per_service.remove(station_name);
}
if per_service.is_empty() {
guard.remove(service_name);
}
}
pub async fn clear_station_level_override(&self, service_name: &str, station_name: &str) {
let mut guard = self.station_meta_overrides.write().await;
let Some(per_service) = guard.get_mut(service_name) else {
return;
};
let Some(entry) = per_service.get_mut(station_name) else {
return;
};
entry.level = None;
if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
per_service.remove(station_name);
}
if per_service.is_empty() {
guard.remove(service_name);
}
}
pub async fn clear_station_runtime_state_override(
&self,
service_name: &str,
station_name: &str,
) {
let mut guard = self.station_meta_overrides.write().await;
let Some(per_service) = guard.get_mut(service_name) else {
return;
};
let Some(entry) = per_service.get_mut(station_name) else {
return;
};
entry.state = None;
if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
per_service.remove(station_name);
}
if per_service.is_empty() {
guard.remove(service_name);
}
}
pub async fn get_station_meta_overrides(
&self,
service_name: &str,
) -> HashMap<String, (Option<bool>, Option<u8>)> {
let guard = self.station_meta_overrides.read().await;
guard
.get(service_name)
.map(|m| {
m.iter()
.map(|(k, v)| (k.clone(), (v.enabled, v.level)))
.collect::<HashMap<_, _>>()
})
.unwrap_or_default()
}
pub async fn get_station_runtime_state_overrides(
&self,
service_name: &str,
) -> HashMap<String, RuntimeConfigState> {
let guard = self.station_meta_overrides.read().await;
guard
.get(service_name)
.map(|m| {
m.iter()
.filter_map(|(k, v)| v.state.map(|state| (k.clone(), state)))
.collect::<HashMap<_, _>>()
})
.unwrap_or_default()
}
pub async fn set_provider_endpoint_enabled_override(
&self,
service_name: &str,
endpoint_key: ProviderEndpointKey,
enabled: bool,
now_ms: u64,
) {
let mut guard = self.provider_endpoint_meta_overrides.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let entry = per_service.entry(endpoint_key).or_default();
entry.enabled = Some(enabled);
entry.updated_at_ms = now_ms;
}
pub async fn clear_provider_endpoint_enabled_override(
&self,
service_name: &str,
endpoint_key: &ProviderEndpointKey,
) {
let mut guard = self.provider_endpoint_meta_overrides.write().await;
let Some(per_service) = guard.get_mut(service_name) else {
return;
};
let Some(entry) = per_service.get_mut(endpoint_key) else {
return;
};
entry.enabled = None;
if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
per_service.remove(endpoint_key);
}
if per_service.is_empty() {
guard.remove(service_name);
}
}
pub async fn set_provider_endpoint_runtime_state_override(
&self,
service_name: &str,
endpoint_key: ProviderEndpointKey,
state: RuntimeConfigState,
now_ms: u64,
) {
let mut guard = self.provider_endpoint_meta_overrides.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let entry = per_service.entry(endpoint_key).or_default();
entry.state = Some(state);
entry.updated_at_ms = now_ms;
}
pub async fn clear_provider_endpoint_runtime_state_override(
&self,
service_name: &str,
endpoint_key: &ProviderEndpointKey,
) {
let mut guard = self.provider_endpoint_meta_overrides.write().await;
let Some(per_service) = guard.get_mut(service_name) else {
return;
};
let Some(entry) = per_service.get_mut(endpoint_key) else {
return;
};
entry.state = None;
if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
per_service.remove(endpoint_key);
}
if per_service.is_empty() {
guard.remove(service_name);
}
}
pub async fn set_upstream_enabled_override(
&self,
service_name: &str,
base_url: String,
enabled: bool,
now_ms: u64,
) {
let mut guard = self.upstream_meta_overrides.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let entry = per_service.entry(base_url).or_default();
entry.enabled = Some(enabled);
entry.updated_at_ms = now_ms;
}
pub async fn clear_upstream_enabled_override(&self, service_name: &str, base_url: &str) {
let mut guard = self.upstream_meta_overrides.write().await;
let Some(per_service) = guard.get_mut(service_name) else {
return;
};
let Some(entry) = per_service.get_mut(base_url) else {
return;
};
entry.enabled = None;
if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
per_service.remove(base_url);
}
if per_service.is_empty() {
guard.remove(service_name);
}
}
pub async fn set_upstream_runtime_state_override(
&self,
service_name: &str,
base_url: String,
state: RuntimeConfigState,
now_ms: u64,
) {
let mut guard = self.upstream_meta_overrides.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let entry = per_service.entry(base_url).or_default();
entry.state = Some(state);
entry.updated_at_ms = now_ms;
}
pub async fn clear_upstream_runtime_state_override(&self, service_name: &str, base_url: &str) {
let mut guard = self.upstream_meta_overrides.write().await;
let Some(per_service) = guard.get_mut(service_name) else {
return;
};
let Some(entry) = per_service.get_mut(base_url) else {
return;
};
entry.state = None;
if entry.enabled.is_none() && entry.level.is_none() && entry.state.is_none() {
per_service.remove(base_url);
}
if per_service.is_empty() {
guard.remove(service_name);
}
}
pub async fn get_upstream_meta_overrides(
&self,
service_name: &str,
) -> HashMap<String, (Option<bool>, Option<RuntimeConfigState>)> {
let mut overrides = HashMap::new();
{
let guard = self.upstream_meta_overrides.read().await;
if let Some(per_service) = guard.get(service_name) {
overrides.extend(
per_service
.iter()
.map(|(k, v)| (k.clone(), (v.enabled, v.state))),
);
}
}
{
let guard = self.provider_endpoint_meta_overrides.read().await;
if let Some(per_service) = guard.get(service_name) {
overrides.extend(
per_service
.iter()
.map(|(k, v)| (k.stable_key(), (v.enabled, v.state))),
);
}
}
overrides
}
pub async fn route_plan_runtime_state_for_provider_endpoints(
&self,
service_name: &str,
) -> RoutePlanRuntimeState {
let mut runtime = RoutePlanRuntimeState::default();
let now = std::time::Instant::now();
{
let mut guard = self.provider_endpoint_runtime_health.write().await;
if let Some(per_service) = guard.get_mut(service_name) {
let mut affinity: Option<(ProviderEndpointKey, u64)> = None;
for (endpoint_key, health) in per_service.iter_mut() {
if health.cooldown_until.is_some_and(|until| now >= until) {
health.failure_count = 0;
health.cooldown_until = None;
}
let cooldown_active = health.cooldown_until.is_some_and(|until| now < until);
runtime.set_provider_endpoint(
endpoint_key.clone(),
RoutePlanUpstreamRuntimeState {
runtime_disabled: false,
failure_count: health.failure_count,
cooldown_active,
usage_exhausted: health.usage_exhausted,
missing_auth: false,
},
);
if let Some(last_good_at_ms) = health.last_good_at_ms
&& affinity
.as_ref()
.is_none_or(|(_, current)| last_good_at_ms > *current)
{
affinity = Some((endpoint_key.clone(), last_good_at_ms));
}
}
if let Some((endpoint_key, _)) = affinity {
runtime.set_affinity_provider_endpoint(Some(endpoint_key));
}
}
}
{
let guard = self.provider_endpoint_meta_overrides.read().await;
if let Some(per_service) = guard.get(service_name) {
for (endpoint_key, meta) in per_service {
let mut upstream_state = runtime.provider_endpoint(endpoint_key);
if meta.enabled == Some(false)
|| meta
.state
.is_some_and(|state| state != RuntimeConfigState::Normal)
{
upstream_state.runtime_disabled = true;
}
runtime.set_provider_endpoint(endpoint_key.clone(), upstream_state);
}
}
}
runtime
}
pub async fn record_provider_endpoint_attempt_success(
&self,
service_name: &str,
endpoint_key: ProviderEndpointKey,
now_ms: u64,
) {
let mut guard = self.provider_endpoint_runtime_health.write().await;
let entry = guard
.entry(service_name.to_string())
.or_default()
.entry(endpoint_key)
.or_default();
entry.failure_count = 0;
entry.cooldown_until = None;
entry.penalty_streak = 0;
entry.last_good_at_ms = Some(now_ms);
}
pub async fn record_provider_endpoint_attempt_failure(
&self,
service_name: &str,
endpoint_key: ProviderEndpointKey,
failure_threshold_cooldown_secs: u64,
cooldown_backoff: CooldownBackoff,
) {
let mut guard = self.provider_endpoint_runtime_health.write().await;
let entry = guard
.entry(service_name.to_string())
.or_default()
.entry(endpoint_key)
.or_default();
entry.failure_count = entry.failure_count.saturating_add(1);
if entry.failure_count >= FAILURE_THRESHOLD {
let base_secs = if failure_threshold_cooldown_secs == 0 {
COOLDOWN_SECS
} else {
failure_threshold_cooldown_secs
};
let effective_secs =
cooldown_backoff.effective_cooldown_secs(base_secs, entry.penalty_streak);
let now = std::time::Instant::now();
let new_until = now + std::time::Duration::from_secs(effective_secs);
if entry
.cooldown_until
.is_none_or(|existing| new_until > existing)
{
entry.cooldown_until = Some(new_until);
}
entry.penalty_streak = entry.penalty_streak.saturating_add(1);
entry.last_good_at_ms = None;
}
}
pub async fn penalize_provider_endpoint_attempt(
&self,
service_name: &str,
endpoint_key: ProviderEndpointKey,
cooldown_secs: u64,
cooldown_backoff: CooldownBackoff,
) {
let mut guard = self.provider_endpoint_runtime_health.write().await;
let entry = guard
.entry(service_name.to_string())
.or_default()
.entry(endpoint_key)
.or_default();
let effective_secs =
cooldown_backoff.effective_cooldown_secs(cooldown_secs, entry.penalty_streak);
entry.failure_count = FAILURE_THRESHOLD;
entry.cooldown_until =
Some(std::time::Instant::now() + std::time::Duration::from_secs(effective_secs));
entry.penalty_streak = entry.penalty_streak.saturating_add(1);
entry.last_good_at_ms = None;
}
pub async fn set_provider_endpoint_usage_exhausted(
&self,
service_name: &str,
endpoint_key: ProviderEndpointKey,
exhausted: bool,
) {
let mut guard = self.provider_endpoint_runtime_health.write().await;
let entry = guard
.entry(service_name.to_string())
.or_default()
.entry(endpoint_key)
.or_default();
entry.usage_exhausted = exhausted;
}
pub async fn prune_runtime_observability_for_service(
&self,
service_name: &str,
mgr: &ServiceConfigManager,
) {
let active_stations = mgr.stations().keys().cloned().collect::<HashSet<_>>();
let active_upstreams = mgr
.stations()
.iter()
.map(|(station_name, service)| {
(
station_name.clone(),
service
.upstreams
.iter()
.map(|upstream| upstream.base_url.clone())
.collect::<HashSet<_>>(),
)
})
.collect::<HashMap<_, _>>();
let active_base_urls = active_upstreams
.values()
.flat_map(|upstreams| upstreams.iter().cloned())
.collect::<HashSet<_>>();
let active_provider_endpoint_keys = mgr
.stations()
.iter()
.flat_map(|(station_name, service)| {
service.upstreams.iter().enumerate().map(|(idx, upstream)| {
Self::active_provider_endpoint_key_for_upstream(
service_name,
station_name.as_str(),
idx,
upstream,
)
})
})
.collect::<HashSet<_>>();
let mut active_provider_ids = HashSet::from(["-".to_string()]);
for service in mgr.stations().values() {
for upstream in &service.upstreams {
if let Some(provider_id) = upstream.tags.get("provider_id") {
active_provider_ids.insert(provider_id.clone());
}
}
}
let layout = service_layout_signature(mgr);
let balance_prune_stations = {
let mut signatures = self.service_layout_signatures.write().await;
let changed = signatures.get(service_name).map_or_else(
|| {
if active_stations.len() == 1 && active_stations.contains("routing") {
Some(HashSet::from(["routing".to_string()]))
} else {
None
}
},
|previous| Some(changed_service_layout_stations(previous, &layout)),
);
signatures.insert(service_name.to_string(), layout);
changed
};
match balance_prune_stations {
Some(changed_layout_stations) if !changed_layout_stations.is_empty() => {
let mut provider_balances = self.provider_balances.write().await;
if let Some(per_service) = provider_balances.get_mut(service_name) {
per_service
.retain(|station_name, _| !changed_layout_stations.contains(station_name));
if per_service.is_empty() {
provider_balances.remove(service_name);
}
}
let mut provider_balance_summaries = self.provider_balance_summaries.write().await;
if let Some(per_service) = provider_balance_summaries.get_mut(service_name) {
per_service
.retain(|station_name, _| !changed_layout_stations.contains(station_name));
if per_service.is_empty() {
provider_balance_summaries.remove(service_name);
}
}
}
None => {
let mut provider_balances = self.provider_balances.write().await;
provider_balances.remove(service_name);
let mut provider_balance_summaries = self.provider_balance_summaries.write().await;
provider_balance_summaries.remove(service_name);
}
Some(_) => {}
}
{
let mut guard = self.station_meta_overrides.write().await;
if let Some(per_service) = guard.get_mut(service_name) {
per_service.retain(|station_name, _| active_stations.contains(station_name));
if per_service.is_empty() {
guard.remove(service_name);
}
}
}
{
let mut guard = self.upstream_meta_overrides.write().await;
if let Some(per_service) = guard.get_mut(service_name) {
per_service.retain(|base_url, _| active_base_urls.contains(base_url));
if per_service.is_empty() {
guard.remove(service_name);
}
}
}
{
let mut guard = self.provider_endpoint_meta_overrides.write().await;
if let Some(per_service) = guard.get_mut(service_name) {
per_service
.retain(|endpoint_key, _| active_provider_endpoint_keys.contains(endpoint_key));
if per_service.is_empty() {
guard.remove(service_name);
}
}
}
{
let mut guard = self.provider_endpoint_runtime_health.write().await;
if let Some(per_service) = guard.get_mut(service_name) {
per_service
.retain(|endpoint_key, _| active_provider_endpoint_keys.contains(endpoint_key));
if per_service.is_empty() {
guard.remove(service_name);
}
}
}
{
let mut guard = self.station_health.write().await;
if let Some(per_service) = guard.get_mut(service_name) {
per_service.retain(|station_name, station_health| {
if !active_stations.contains(station_name) {
return false;
}
if let Some(allowed_upstreams) = active_upstreams.get(station_name) {
station_health
.upstreams
.retain(|upstream| allowed_upstreams.contains(&upstream.base_url));
}
!station_health.upstreams.is_empty()
});
if per_service.is_empty() {
guard.remove(service_name);
}
}
}
{
let mut guard = self.passive_station_health.write().await;
if let Some(per_service) = guard.get_mut(service_name) {
per_service.retain(|station_name, station_health| {
if !active_stations.contains(station_name) {
return false;
}
if let Some(allowed_upstreams) = active_upstreams.get(station_name) {
station_health.retain(|base_url, _| allowed_upstreams.contains(base_url));
}
!station_health.is_empty()
});
if per_service.is_empty() {
guard.remove(service_name);
}
}
}
{
let mut guard = self.station_health_checks.write().await;
if let Some(per_service) = guard.get_mut(service_name) {
per_service.retain(|station_name, _| active_stations.contains(station_name));
if per_service.is_empty() {
guard.remove(service_name);
}
}
}
{
let mut guard = self.usage_rollups.write().await;
if let Some(rollup) = guard.get_mut(service_name) {
rollup
.by_config
.retain(|station_name, _| active_stations.contains(station_name));
rollup.by_config_day.retain(|station_name, _day_map| {
if !active_stations.contains(station_name) {
return false;
}
true
});
rollup
.by_provider
.retain(|provider_id, _| active_provider_ids.contains(provider_id));
rollup.by_provider_day.retain(|provider_id, _day_map| {
if !active_provider_ids.contains(provider_id) {
return false;
}
true
});
}
}
}
fn active_provider_endpoint_key_for_upstream(
service_name: &str,
station_name: &str,
upstream_index: usize,
upstream: &crate::config::UpstreamConfig,
) -> ProviderEndpointKey {
let provider_id = upstream
.tags
.get("provider_id")
.cloned()
.unwrap_or_else(|| format!("{station_name}#{upstream_index}"));
let endpoint_id = upstream
.tags
.get("endpoint_id")
.cloned()
.unwrap_or_else(|| upstream_index.to_string());
ProviderEndpointKey::new(service_name, provider_id, endpoint_id)
}
pub async fn record_station_health(
&self,
service_name: &str,
station_name: String,
health: StationHealth,
) {
let mut guard = self.station_health.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
per_service.insert(station_name, health);
}
pub async fn get_station_health(&self, service_name: &str) -> HashMap<String, StationHealth> {
let active = {
let guard = self.station_health.read().await;
guard.get(service_name).cloned().unwrap_or_default()
};
let passive = {
let guard = self.passive_station_health.read().await;
guard.get(service_name).cloned().unwrap_or_default()
};
merge_station_health(active, passive)
}
pub async fn record_provider_balance_snapshot(
&self,
service_name: &str,
mut snapshot: ProviderBalanceSnapshot,
) {
let (Some(station_name), Some(upstream_index)) =
(snapshot.station_name.clone(), snapshot.upstream_index)
else {
return;
};
let now_ms = unix_now_ms();
snapshot.refresh_status(now_ms);
let station_summary = {
let mut guard = self.provider_balances.write().await;
let station_balances = guard
.entry(service_name.to_string())
.or_default()
.entry(station_name.clone())
.or_default();
station_balances
.entry(upstream_index)
.or_default()
.insert(snapshot.provider_id.clone(), snapshot);
StationRoutingBalanceSummary::from_snapshot_iter_at(
station_balances
.values()
.flat_map(|providers| providers.values()),
now_ms,
)
};
let mut summaries = self.provider_balance_summaries.write().await;
summaries
.entry(service_name.to_string())
.or_default()
.insert(station_name, station_summary);
}
pub async fn get_provider_balance_view(
&self,
service_name: &str,
) -> HashMap<String, Vec<ProviderBalanceSnapshot>> {
let now_ms = unix_now_ms();
let guard = self.provider_balances.read().await;
let Some(per_service) = guard.get(service_name) else {
return HashMap::new();
};
per_service
.iter()
.map(|(station_name, upstreams)| {
let mut snapshots = upstreams
.values()
.flat_map(|providers| providers.values().cloned())
.collect::<Vec<_>>();
for snapshot in &mut snapshots {
snapshot.refresh_status(now_ms);
}
snapshots.sort_by(|a, b| {
a.upstream_index
.cmp(&b.upstream_index)
.then_with(|| a.provider_id.cmp(&b.provider_id))
});
(station_name.clone(), snapshots)
})
.collect()
}
pub async fn get_provider_balance_summary_view(
&self,
service_name: &str,
) -> HashMap<String, StationRoutingBalanceSummary> {
let guard = self.provider_balance_summaries.read().await;
let Some(per_service) = guard.get(service_name) else {
return HashMap::new();
};
per_service.clone()
}
pub async fn record_passive_upstream_success(
&self,
service_name: &str,
station_name: &str,
base_url: &str,
status_code: Option<u16>,
now_ms: u64,
) {
let mut guard = self.passive_station_health.write().await;
let entry = guard
.entry(service_name.to_string())
.or_default()
.entry(station_name.to_string())
.or_default()
.entry(base_url.to_string())
.or_default();
entry.record_success(now_ms, status_code);
}
pub async fn record_passive_upstream_failure(&self, params: PassiveUpstreamFailureRecord) {
let PassiveUpstreamFailureRecord {
service_name,
station_name,
base_url,
status_code,
error_class,
error,
now_ms,
} = params;
let mut guard = self.passive_station_health.write().await;
let entry = guard
.entry(service_name)
.or_default()
.entry(station_name)
.or_default()
.entry(base_url)
.or_default();
entry.record_failure(now_ms, status_code, error_class, error);
}
pub async fn get_lb_view(&self) -> HashMap<String, LbConfigView> {
let Some(lb_states) = self.lb_states.as_ref() else {
return HashMap::new();
};
let mut map = match lb_states.lock() {
Ok(m) => m,
Err(e) => e.into_inner(),
};
let now = std::time::Instant::now();
let mut out = HashMap::new();
for (cfg_name, st) in map.iter_mut() {
let len = st
.failure_counts
.len()
.max(st.cooldown_until.len())
.max(st.usage_exhausted.len());
if len == 0 {
continue;
}
if st.failure_counts.len() != len {
st.failure_counts.resize(len, 0);
}
if st.cooldown_until.len() != len {
st.cooldown_until.resize(len, None);
}
if st.usage_exhausted.len() != len {
st.usage_exhausted.resize(len, false);
}
let mut upstreams = Vec::with_capacity(len);
for idx in 0..len {
let failure_count = st.failure_counts.get(idx).copied().unwrap_or(0);
let cooldown_remaining_secs = st
.cooldown_until
.get(idx)
.and_then(|v| *v)
.map(|until| until.saturating_duration_since(now).as_secs())
.filter(|&s| s > 0);
let usage_exhausted = st.usage_exhausted.get(idx).copied().unwrap_or(false);
upstreams.push(LbUpstreamView {
failure_count,
cooldown_remaining_secs,
usage_exhausted,
});
}
out.insert(
cfg_name.clone(),
LbConfigView {
last_good_index: st.last_good_index,
upstreams,
},
);
}
out
}
pub async fn list_health_checks(
&self,
service_name: &str,
) -> HashMap<String, HealthCheckStatus> {
let guard = self.station_health_checks.read().await;
guard.get(service_name).cloned().unwrap_or_default()
}
pub async fn try_begin_station_health_check(
&self,
service_name: &str,
station_name: &str,
total: usize,
now_ms: u64,
) -> bool {
let mut guard = self.station_health_checks.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
if let Some(existing) = per_service.get(station_name)
&& !existing.done
{
return false;
}
per_service.insert(
station_name.to_string(),
HealthCheckStatus {
started_at_ms: now_ms,
updated_at_ms: now_ms,
total: total.min(u32::MAX as usize) as u32,
completed: 0,
ok: 0,
err: 0,
cancel_requested: false,
canceled: false,
done: false,
last_error: None,
},
);
true
}
pub async fn request_cancel_station_health_check(
&self,
service_name: &str,
station_name: &str,
now_ms: u64,
) -> bool {
let mut guard = self.station_health_checks.write().await;
let Some(per_service) = guard.get_mut(service_name) else {
return false;
};
let Some(st) = per_service.get_mut(station_name) else {
return false;
};
if st.done {
return false;
}
st.cancel_requested = true;
st.updated_at_ms = now_ms;
true
}
pub async fn is_station_health_check_cancel_requested(
&self,
service_name: &str,
station_name: &str,
) -> bool {
let guard = self.station_health_checks.read().await;
guard
.get(service_name)
.and_then(|m| m.get(station_name))
.is_some_and(|s| s.cancel_requested && !s.done)
}
pub async fn record_station_health_check_result(
&self,
service_name: &str,
station_name: &str,
now_ms: u64,
upstream: UpstreamHealth,
) {
{
let mut guard = self.station_health.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let entry = per_service
.entry(station_name.to_string())
.or_insert_with(|| StationHealth {
checked_at_ms: now_ms,
upstreams: Vec::new(),
});
entry.checked_at_ms = entry.checked_at_ms.max(now_ms);
entry.upstreams.push(upstream.clone());
if entry.upstreams.len() > Self::MAX_HEALTH_RECORDS_PER_STATION {
let extra = entry
.upstreams
.len()
.saturating_sub(Self::MAX_HEALTH_RECORDS_PER_STATION);
if extra > 0 {
entry.upstreams.drain(0..extra);
}
}
}
let mut guard = self.station_health_checks.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let st = per_service.entry(station_name.to_string()).or_default();
st.updated_at_ms = now_ms;
st.completed = st.completed.saturating_add(1);
match upstream.ok {
Some(true) => st.ok = st.ok.saturating_add(1),
Some(false) => {
st.err = st.err.saturating_add(1);
if st.last_error.is_none() {
st.last_error = upstream.error.clone();
}
}
None => {}
}
}
pub async fn finish_station_health_check(
&self,
service_name: &str,
station_name: &str,
now_ms: u64,
canceled: bool,
) {
let mut guard = self.station_health_checks.write().await;
let per_service = guard.entry(service_name.to_string()).or_default();
let st = per_service.entry(station_name.to_string()).or_default();
st.updated_at_ms = now_ms;
st.canceled = canceled;
st.done = true;
}
pub async fn get_usage_rollup_view(
&self,
service_name: &str,
top_n: usize,
days: usize,
) -> UsageRollupView {
let guard = self.usage_rollups.read().await;
let Some(rollup) = guard.get(service_name) else {
return UsageRollupView::default();
};
fn now_day() -> i32 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| (d.as_millis() / 86_400_000) as i32)
.unwrap_or(0)
}
fn sorted_day_series(map: &HashMap<i32, UsageBucket>) -> Vec<(i32, UsageBucket)> {
let mut out = map.iter().map(|(k, v)| (*k, v.clone())).collect::<Vec<_>>();
out.sort_by_key(|(k, _)| *k);
out
}
fn filled_day_series(
map: &HashMap<i32, UsageBucket>,
start_day: i32,
end_day: i32,
) -> Vec<(i32, UsageBucket)> {
if start_day > end_day {
return Vec::new();
}
(start_day..=end_day)
.map(|day| (day, map.get(&day).cloned().unwrap_or_default()))
.collect()
}
fn sum_series(series: &[(i32, UsageBucket)]) -> UsageBucket {
let mut out = UsageBucket::default();
for (_, bucket) in series {
out.add_assign(bucket);
}
out
}
fn aggregate_entity_window(
source: &HashMap<String, HashMap<i32, UsageBucket>>,
start_day: Option<i32>,
end_day: Option<i32>,
top_n: usize,
) -> Vec<(String, UsageBucket)> {
let mut out = Vec::new();
for (name, days) in source {
let mut bucket = UsageBucket::default();
for (day, value) in days {
let include = match (start_day, end_day) {
(Some(start), Some(end)) => *day >= start && *day <= end,
_ => true,
};
if include {
bucket.add_assign(value);
}
}
if bucket.requests_total > 0 {
out.push((name.clone(), bucket));
}
}
out.sort_by(|(left_name, left), (right_name, right)| {
right
.usage
.total_tokens
.cmp(&left.usage.total_tokens)
.then_with(|| right.requests_total.cmp(&left.requests_total))
.then_with(|| left_name.cmp(right_name))
});
out.truncate(top_n);
out
}
let all_loaded = days == 0;
let loaded_first_day = rollup.by_day.keys().min().copied();
let loaded_last_day = rollup.by_day.keys().max().copied();
let loaded_days_with_data = rollup
.by_day
.values()
.filter(|bucket| bucket.requests_total > 0)
.count();
let (start_day, end_day) = if all_loaded {
(loaded_first_day, loaded_last_day)
} else {
let end = now_day();
let offset = i32::try_from(days.saturating_sub(1)).unwrap_or(i32::MAX);
(Some(end.saturating_sub(offset)), Some(end))
};
let by_day = match (all_loaded, start_day, end_day) {
(true, _, _) => sorted_day_series(&rollup.by_day),
(false, Some(start), Some(end)) => filled_day_series(&rollup.by_day, start, end),
_ => Vec::new(),
};
let window = if all_loaded {
rollup.loaded.clone()
} else {
sum_series(&by_day)
};
let mut by_config =
aggregate_entity_window(&rollup.by_config_day, start_day, end_day, top_n);
if all_loaded && by_config.is_empty() {
by_config = rollup
.by_config
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<_>>();
by_config.sort_by(|(left_name, left), (right_name, right)| {
right
.usage
.total_tokens
.cmp(&left.usage.total_tokens)
.then_with(|| right.requests_total.cmp(&left.requests_total))
.then_with(|| left_name.cmp(right_name))
});
by_config.truncate(top_n);
}
let mut by_provider =
aggregate_entity_window(&rollup.by_provider_day, start_day, end_day, top_n);
if all_loaded && by_provider.is_empty() {
by_provider = rollup
.by_provider
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<_>>();
by_provider.sort_by(|(left_name, left), (right_name, right)| {
right
.usage
.total_tokens
.cmp(&left.usage.total_tokens)
.then_with(|| right.requests_total.cmp(&left.requests_total))
.then_with(|| left_name.cmp(right_name))
});
by_provider.truncate(top_n);
}
let mut by_config_day = HashMap::new();
for (name, _) in &by_config {
let series = rollup
.by_config_day
.get(name)
.map(|m| match (all_loaded, start_day, end_day) {
(true, _, _) => sorted_day_series(m),
(false, Some(start), Some(end)) => filled_day_series(m, start, end),
_ => Vec::new(),
})
.unwrap_or_default();
by_config_day.insert(name.clone(), series);
}
let mut by_provider_day = HashMap::new();
for (name, _) in &by_provider {
let series = rollup
.by_provider_day
.get(name)
.map(|m| match (all_loaded, start_day, end_day) {
(true, _, _) => sorted_day_series(m),
(false, Some(start), Some(end)) => filled_day_series(m, start, end),
_ => Vec::new(),
})
.unwrap_or_default();
by_provider_day.insert(name.clone(), series);
}
let window_days_with_data = by_day
.iter()
.filter(|(_, bucket)| bucket.requests_total > 0)
.count();
let coverage = UsageRollupCoverage {
requested_days: days,
all_loaded,
loaded_first_day,
loaded_last_day,
loaded_days_with_data,
loaded_requests: rollup.loaded.requests_total,
window_first_day: start_day,
window_last_day: end_day,
window_days_with_data,
window_requests: window.requests_total,
window_exceeds_loaded_start: matches!(
(all_loaded, start_day, loaded_first_day),
(false, Some(start), Some(first)) if start < first
),
};
UsageRollupView {
loaded: rollup.loaded.clone(),
window,
coverage,
by_day,
by_config,
by_config_day,
by_provider,
by_provider_day,
}
}
pub async fn replay_usage_from_requests_log(
&self,
service_name: &str,
log_path: PathBuf,
base_url_to_provider_id: HashMap<String, String>,
) -> usize {
let enabled = std::env::var("CODEX_HELPER_USAGE_REPLAY_ON_STARTUP")
.ok()
.map(|v| {
matches!(
v.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "y" | "on"
)
})
.unwrap_or(true);
if !enabled {
return 0;
}
let already_has_data = {
let guard = self.usage_rollups.read().await;
guard
.get(service_name)
.is_some_and(|r| r.loaded.requests_total > 0)
};
if already_has_data {
return 0;
}
if !log_path.exists() {
return 0;
}
let max_bytes = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_BYTES")
.ok()
.and_then(|s| s.trim().parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(8 * 1024 * 1024);
let max_lines = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_LINES")
.ok()
.and_then(|s| s.trim().parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(20_000);
let mut file = match std::fs::File::open(&log_path) {
Ok(f) => f,
Err(_) => return 0,
};
let len: u64 = file.metadata().map(|m| m.len()).unwrap_or_default();
let start = len.saturating_sub(max_bytes as u64);
if file.seek(SeekFrom::Start(start)).is_err() {
return 0;
}
let mut buf = Vec::new();
if file.read_to_end(&mut buf).is_err() {
return 0;
}
if start > 0 {
if let Some(pos) = buf.iter().position(|b| *b == b'\n') {
buf = buf[pos + 1..].to_vec();
} else {
return 0;
}
}
let text = match std::str::from_utf8(&buf) {
Ok(s) => s,
Err(_) => return 0,
};
let lines = text
.lines()
.map(|l| l.trim())
.filter(|l| !l.is_empty())
.collect::<Vec<_>>();
let start_idx = lines.len().saturating_sub(max_lines);
let mut events = Vec::new();
for line in &lines[start_idx..] {
let Ok(v) = serde_json::from_str::<JsonValue>(line) else {
continue;
};
let Some(svc) = v.get("service").and_then(|x| x.as_str()) else {
continue;
};
if svc != service_name {
continue;
}
let ended_at_ms = v.get("timestamp_ms").and_then(|x| x.as_u64()).unwrap_or(0);
let status_code = v.get("status_code").and_then(|x| x.as_u64()).unwrap_or(0) as u16;
let duration_ms = v.get("duration_ms").and_then(|x| x.as_u64()).unwrap_or(0);
let station_name = v
.get("station_name")
.and_then(|x| x.as_str())
.unwrap_or("-")
.to_string();
let upstream_base_url = v
.get("upstream_base_url")
.and_then(|x| x.as_str())
.unwrap_or("-")
.to_string();
let provider_id = v
.get("provider_id")
.and_then(|x| x.as_str())
.map(|s| s.to_string())
.or_else(|| base_url_to_provider_id.get(&upstream_base_url).cloned())
.unwrap_or_else(|| "-".to_string());
let usage = v
.get("usage")
.and_then(|u| serde_json::from_value::<UsageMetrics>(u.clone()).ok());
let ttfb_ms = v.get("ttfb_ms").and_then(|x| x.as_u64());
events.push((
ended_at_ms,
status_code,
duration_ms,
station_name,
provider_id,
usage,
ttfb_ms,
));
}
if events.is_empty() {
return 0;
}
let mut guard = self.usage_rollups.write().await;
let rollup = guard.entry(service_name.to_string()).or_default();
for (ended_at_ms, status_code, duration_ms, cfg_key, provider_key, usage, ttfb_ms) in
events.iter()
{
let day = (*ended_at_ms / 86_400_000) as i32;
rollup
.loaded
.record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
rollup.by_day.entry(day).or_default().record(
*status_code,
*duration_ms,
usage.as_ref(),
None,
*ttfb_ms,
);
rollup.by_config.entry(cfg_key.clone()).or_default().record(
*status_code,
*duration_ms,
usage.as_ref(),
None,
*ttfb_ms,
);
rollup
.by_config_day
.entry(cfg_key.clone())
.or_default()
.entry(day)
.or_default()
.record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
rollup
.by_provider
.entry(provider_key.clone())
.or_default()
.record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
rollup
.by_provider_day
.entry(provider_key.clone())
.or_default()
.entry(day)
.or_default()
.record(*status_code, *duration_ms, usage.as_ref(), None, *ttfb_ms);
}
events.len()
}
pub async fn resolve_session_cwd(&self, session_id: &str) -> Option<String> {
if self.session_cwd_cache_max_entries == 0 {
return sessions::find_codex_session_cwd_by_id(session_id)
.await
.ok()
.flatten();
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
{
let guard = self.session_cwd_cache.read().await;
if let Some(v) = guard.get(session_id) {
let out = v.cwd.clone();
drop(guard);
let mut guard = self.session_cwd_cache.write().await;
if let Some(v) = guard.get_mut(session_id) {
v.last_seen_ms = now_ms;
}
return out;
}
}
let resolved = sessions::find_codex_session_cwd_by_id(session_id)
.await
.ok()
.flatten();
let mut guard = self.session_cwd_cache.write().await;
guard.insert(
session_id.to_string(),
SessionCwdCacheEntry {
cwd: resolved.clone(),
last_seen_ms: now_ms,
},
);
resolved
}
#[allow(clippy::too_many_arguments)]
pub async fn begin_request(
&self,
service: &str,
method: &str,
path: &str,
session_id: Option<String>,
client_name: Option<String>,
client_addr: Option<String>,
cwd: Option<String>,
model: Option<String>,
reasoning_effort: Option<String>,
service_tier: Option<String>,
started_at_ms: u64,
) -> u64 {
let id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
let trace_id = Some(crate::logging::request_trace_id(service, id));
let req = ActiveRequest {
id,
trace_id,
session_id,
client_name,
client_addr,
cwd,
model,
reasoning_effort,
service_tier,
station_name: None,
provider_id: None,
upstream_base_url: None,
route_decision: None,
service: service.to_string(),
method: method.to_string(),
path: path.to_string(),
started_at_ms,
};
let mut guard = self.active_requests.write().await;
guard.insert(id, req);
id
}
pub async fn update_request_route(
&self,
request_id: u64,
station_name: Option<String>,
provider_id: Option<String>,
upstream_base_url: String,
route_decision: Option<RouteDecisionProvenance>,
) {
let mut guard = self.active_requests.write().await;
let Some(req) = guard.get_mut(&request_id) else {
return;
};
req.station_name = station_name;
req.provider_id = provider_id;
req.upstream_base_url = Some(upstream_base_url);
req.route_decision = route_decision;
}
pub async fn finish_request(&self, params: FinishRequestParams) {
let mut active = self.active_requests.write().await;
let Some(req) = active.remove(¶ms.id) else {
return;
};
let pricing_model = req
.route_decision
.as_ref()
.and_then(|decision| decision.effective_model.as_ref())
.map(|value| value.value.as_str())
.or(req.model.as_deref());
let cost = estimate_request_cost_from_operator_catalog_for_service(
pricing_model,
params.usage.as_ref(),
CostAdjustments::default(),
&req.service,
);
let mut finished = FinishedRequest {
id: params.id,
trace_id: req.trace_id,
session_id: req.session_id,
client_name: req.client_name,
client_addr: req.client_addr,
cwd: req.cwd,
model: req.model,
reasoning_effort: req.reasoning_effort,
service_tier: params.observed_service_tier.or(req.service_tier),
station_name: req.station_name,
provider_id: req.provider_id,
upstream_base_url: req.upstream_base_url,
route_decision: req.route_decision,
usage: params.usage.clone(),
cost,
retry: params.retry,
observability: RequestObservability::default(),
service: req.service,
method: req.method,
path: req.path,
status_code: params.status_code,
duration_ms: params.duration_ms,
ttfb_ms: params.ttfb_ms,
streaming: params.streaming,
ended_at_ms: params.ended_at_ms,
};
finished.refresh_observability();
{
let day = (finished.ended_at_ms / 86_400_000) as i32;
let cfg_key = finished
.station_name
.clone()
.unwrap_or_else(|| "-".to_string());
let provider_key = finished
.provider_id
.clone()
.unwrap_or_else(|| "-".to_string());
let mut rollups = self.usage_rollups.write().await;
let rollup = rollups.entry(finished.service.clone()).or_default();
rollup.loaded.record(
finished.status_code,
finished.duration_ms,
finished.usage.as_ref(),
Some(&finished.cost),
finished.ttfb_ms,
);
rollup.by_day.entry(day).or_default().record(
finished.status_code,
finished.duration_ms,
finished.usage.as_ref(),
Some(&finished.cost),
finished.ttfb_ms,
);
rollup.by_config.entry(cfg_key.clone()).or_default().record(
finished.status_code,
finished.duration_ms,
finished.usage.as_ref(),
Some(&finished.cost),
finished.ttfb_ms,
);
rollup
.by_config_day
.entry(cfg_key)
.or_default()
.entry(day)
.or_default()
.record(
finished.status_code,
finished.duration_ms,
finished.usage.as_ref(),
Some(&finished.cost),
finished.ttfb_ms,
);
rollup
.by_provider
.entry(provider_key.clone())
.or_default()
.record(
finished.status_code,
finished.duration_ms,
finished.usage.as_ref(),
Some(&finished.cost),
finished.ttfb_ms,
);
rollup
.by_provider_day
.entry(provider_key)
.or_default()
.entry(day)
.or_default()
.record(
finished.status_code,
finished.duration_ms,
finished.usage.as_ref(),
Some(&finished.cost),
finished.ttfb_ms,
);
}
if let Some(sid) = finished.session_id.as_deref() {
let mut stats = self.session_stats.write().await;
let entry = stats.entry(sid.to_string()).or_default();
entry.turns_total = entry.turns_total.saturating_add(1);
entry.last_client_name = finished
.client_name
.clone()
.or(entry.last_client_name.clone());
entry.last_client_addr = finished
.client_addr
.clone()
.or(entry.last_client_addr.clone());
entry.last_model = finished.model.clone().or(entry.last_model.clone());
entry.last_reasoning_effort = finished
.reasoning_effort
.clone()
.or(entry.last_reasoning_effort.clone());
entry.last_service_tier = finished
.service_tier
.clone()
.or(entry.last_service_tier.clone());
entry.last_provider_id = finished
.provider_id
.clone()
.or(entry.last_provider_id.clone());
entry.last_station_name = finished
.station_name
.clone()
.or(entry.last_station_name.clone());
if finished.route_decision.is_some() {
entry.last_route_decision = finished.route_decision.clone();
}
if let Some(u) = finished.usage.as_ref() {
entry.last_usage = Some(u.clone());
entry.total_usage.add_assign(u);
entry.turns_with_usage = entry.turns_with_usage.saturating_add(1);
}
entry.last_status = Some(finished.status_code);
entry.last_duration_ms = Some(finished.duration_ms);
entry.last_ended_at_ms = Some(finished.ended_at_ms);
entry.last_seen_ms = finished.ended_at_ms;
}
let mut recent = self.recent_finished.write().await;
recent.push_front(finished);
while recent.len() > recent_finished_max() {
recent.pop_back();
}
}
pub async fn list_active_requests(&self) -> Vec<ActiveRequest> {
let guard = self.active_requests.read().await;
let mut vec = guard.values().cloned().collect::<Vec<_>>();
vec.sort_by_key(|r| r.started_at_ms);
vec
}
pub async fn list_recent_finished(&self, limit: usize) -> Vec<FinishedRequest> {
let guard = self.recent_finished.read().await;
guard.iter().take(limit).cloned().collect()
}
pub async fn list_session_stats(&self) -> HashMap<String, SessionStats> {
let guard = self.session_stats.read().await;
guard.clone()
}
pub async fn list_session_identity_cards(
&self,
recent_limit: usize,
) -> Vec<SessionIdentityCard> {
let recent_limit = recent_limit.clamp(1, recent_finished_max());
let (
active,
recent,
overrides,
station_overrides,
model_overrides,
service_tier_overrides,
bindings,
route_affinities,
global_station_override,
stats,
) = tokio::join!(
self.list_active_requests(),
self.list_recent_finished(recent_limit),
self.list_session_effort_overrides(),
self.list_session_station_overrides(),
self.list_session_model_overrides(),
self.list_session_service_tier_overrides(),
self.list_session_bindings(),
self.list_session_route_affinities(),
self.get_global_station_override(),
self.list_session_stats(),
);
build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
active: &active,
recent: &recent,
overrides: &overrides,
station_overrides: &station_overrides,
model_overrides: &model_overrides,
service_tier_overrides: &service_tier_overrides,
bindings: &bindings,
route_affinities: &route_affinities,
global_station_override: global_station_override.as_deref(),
stats: &stats,
})
}
async fn resolve_host_transcript_paths_cached(
&self,
session_ids: &[String],
) -> HashMap<String, Option<String>> {
let mut unique = session_ids
.iter()
.map(|sid| sid.trim())
.filter(|sid| !sid.is_empty())
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
unique.sort();
unique.dedup();
if unique.is_empty() {
return HashMap::new();
}
if self.session_transcript_path_cache_max_entries == 0 {
return sessions::find_codex_session_files_by_ids(&unique)
.await
.unwrap_or_default()
.into_iter()
.map(|(sid, path)| (sid, Some(path.to_string_lossy().to_string())))
.collect();
}
let now_ms = unix_now_ms();
let ttl_ms = self.session_transcript_path_cache_ttl_ms;
let mut resolved = HashMap::<String, Option<String>>::new();
let mut stale_or_missing = Vec::<String>::new();
{
let cache = self.session_transcript_path_cache.read().await;
for sid in &unique {
let fresh = cache.get(sid).filter(|entry| {
ttl_ms == 0 || now_ms.saturating_sub(entry.last_checked_ms) <= ttl_ms
});
if let Some(entry) = fresh {
resolved.insert(sid.clone(), entry.path.clone());
} else {
stale_or_missing.push(sid.clone());
}
}
}
if !stale_or_missing.is_empty() {
let found = sessions::find_codex_session_files_by_ids(&stale_or_missing)
.await
.unwrap_or_default();
let mut cache = self.session_transcript_path_cache.write().await;
for sid in stale_or_missing {
let path = found
.get(&sid)
.map(|path| path.to_string_lossy().to_string());
cache.insert(
sid.clone(),
SessionTranscriptPathCacheEntry {
path: path.clone(),
last_checked_ms: now_ms,
last_seen_ms: now_ms,
},
);
resolved.insert(sid, path);
}
prune_lru_cache(
&mut cache,
self.session_transcript_path_cache_max_entries,
|entry| entry.last_seen_ms,
);
}
{
let mut cache = self.session_transcript_path_cache.write().await;
for sid in &unique {
if let Some(entry) = cache.get_mut(sid) {
entry.last_seen_ms = now_ms;
}
}
}
resolved
}
pub async fn enrich_session_identity_cards_with_cached_host_transcripts(
&self,
cards: &mut [SessionIdentityCard],
) {
let session_ids = cards
.iter()
.filter_map(|card| card.session_id.as_deref())
.map(str::to_owned)
.collect::<Vec<_>>();
let resolved = self
.resolve_host_transcript_paths_cached(&session_ids)
.await;
for card in cards {
card.host_local_transcript_path = card
.session_id
.as_deref()
.and_then(|sid| resolved.get(sid))
.and_then(Clone::clone);
}
}
pub async fn list_session_identity_cards_with_host_transcripts(
&self,
recent_limit: usize,
) -> Vec<SessionIdentityCard> {
let mut cards = self.list_session_identity_cards(recent_limit).await;
self.enrich_session_identity_cards_with_cached_host_transcripts(&mut cards)
.await;
cards
}
pub fn spawn_cleanup_task(state: Arc<Self>) {
tokio::spawn(async move {
let mut tick = interval(Duration::from_secs(30));
loop {
tick.tick().await;
state.prune_periodic().await;
}
});
}
async fn prune_periodic(&self) {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let active = self.active_requests.read().await;
let mut active_sessions: HashMap<String, ()> = HashMap::new();
for req in active.values() {
if let Some(sid) = req.session_id.as_deref() {
active_sessions.insert(sid.to_string(), ());
}
}
if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
let cutoff_override = now_ms - self.session_override_ttl_ms;
let mut overrides = self.session_effort_overrides.write().await;
overrides.retain(|sid, v| {
if active_sessions.contains_key(sid) {
return true;
}
v.last_seen_ms >= cutoff_override
});
}
if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
let cutoff_override = now_ms - self.session_override_ttl_ms;
let mut overrides = self.session_station_overrides.write().await;
overrides.retain(|sid, v| {
if active_sessions.contains_key(sid) {
return true;
}
v.last_seen_ms >= cutoff_override
});
}
if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
let cutoff_override = now_ms - self.session_override_ttl_ms;
let mut overrides = self.session_route_target_overrides.write().await;
overrides.retain(|sid, v| {
if active_sessions.contains_key(sid) {
return true;
}
v.last_seen_ms >= cutoff_override
});
}
if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
let cutoff_override = now_ms - self.session_override_ttl_ms;
let mut overrides = self.session_model_overrides.write().await;
overrides.retain(|sid, v| {
if active_sessions.contains_key(sid) {
return true;
}
v.last_seen_ms >= cutoff_override
});
}
if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
let cutoff_override = now_ms - self.session_override_ttl_ms;
let mut overrides = self.session_service_tier_overrides.write().await;
overrides.retain(|sid, v| {
if active_sessions.contains_key(sid) {
return true;
}
v.last_seen_ms >= cutoff_override
});
}
if self.session_binding_ttl_ms > 0 && now_ms >= self.session_binding_ttl_ms {
let cutoff_binding = now_ms - self.session_binding_ttl_ms;
let mut bindings = self.session_bindings.write().await;
bindings.retain(|sid, entry| {
if active_sessions.contains_key(sid) {
return true;
}
entry.binding.last_seen_ms >= cutoff_binding
});
}
if self.session_binding_max_entries > 0 {
let mut bindings = self.session_bindings.write().await;
if bindings.len() > self.session_binding_max_entries {
let mut removable = bindings
.iter()
.filter(|(sid, _)| !active_sessions.contains_key(*sid))
.map(|(sid, entry)| (sid.clone(), entry.binding.last_seen_ms))
.collect::<Vec<_>>();
removable.sort_by_key(|(_, last_seen_ms)| *last_seen_ms);
let remove_count = bindings
.len()
.saturating_sub(self.session_binding_max_entries)
.min(removable.len());
for (sid, _) in removable.into_iter().take(remove_count) {
bindings.remove(&sid);
}
}
}
{
let mut affinities = self.session_route_affinities.write().await;
if self.session_route_affinity_ttl_ms > 0
&& now_ms >= self.session_route_affinity_ttl_ms
{
let cutoff_affinity = now_ms - self.session_route_affinity_ttl_ms;
affinities.retain(|sid, affinity| {
active_sessions.contains_key(sid)
|| affinity.last_selected_at_ms >= cutoff_affinity
});
}
prune_lru_cache(
&mut affinities,
self.session_route_affinity_max_entries,
|entry| entry.last_selected_at_ms,
);
}
let keep_days: i32 = std::env::var("CODEX_HELPER_USAGE_ROLLUP_KEEP_DAYS")
.ok()
.and_then(|s| s.trim().parse::<i32>().ok())
.filter(|&n| n > 0)
.unwrap_or(60);
let now_day = (now_ms / 86_400_000) as i32;
let cutoff_day = now_day.saturating_sub(keep_days);
let mut rollups = self.usage_rollups.write().await;
for rollup in rollups.values_mut() {
rollup.by_day.retain(|day, _| *day >= cutoff_day);
rollup.by_config_day.retain(|_, m| {
m.retain(|day, _| *day >= cutoff_day);
!m.is_empty()
});
rollup.by_provider_day.retain(|_, m| {
m.retain(|day, _| *day >= cutoff_day);
!m.is_empty()
});
}
let cutoff_cwd =
if self.session_cwd_cache_ttl_ms == 0 || now_ms < self.session_cwd_cache_ttl_ms {
0
} else {
now_ms - self.session_cwd_cache_ttl_ms
};
self.prune_session_cwd_cache(&active_sessions, cutoff_cwd)
.await;
self.prune_session_transcript_path_cache(now_ms).await;
if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
let cutoff_stats = now_ms - self.session_override_ttl_ms;
let mut stats = self.session_stats.write().await;
stats.retain(|sid, v| {
active_sessions.contains_key(sid) || v.last_seen_ms >= cutoff_stats
});
}
}
async fn prune_session_cwd_cache(&self, active_sessions: &HashMap<String, ()>, cutoff: u64) {
if self.session_cwd_cache_max_entries == 0 {
return;
}
let mut cache = self.session_cwd_cache.write().await;
if self.session_cwd_cache_ttl_ms > 0 {
cache.retain(|sid, v| {
if active_sessions.contains_key(sid) {
return true;
}
v.last_seen_ms >= cutoff
});
}
let max = self.session_cwd_cache_max_entries;
if max == 0 || cache.len() <= max {
return;
}
let mut keys = cache
.iter()
.map(|(sid, v)| (sid.clone(), v.last_seen_ms))
.collect::<Vec<_>>();
keys.sort_by_key(|(_, t)| *t);
let remove_count = keys.len().saturating_sub(max);
for (sid, _) in keys.into_iter().take(remove_count) {
cache.remove(&sid);
}
}
async fn prune_session_transcript_path_cache(&self, now_ms: u64) {
let mut cache = self.session_transcript_path_cache.write().await;
if self.session_transcript_path_cache_max_entries == 0 {
cache.clear();
return;
}
if self.session_transcript_path_cache_ttl_ms > 0 {
let cutoff = now_ms.saturating_sub(self.session_transcript_path_cache_ttl_ms);
cache.retain(|_, entry| entry.last_seen_ms >= cutoff);
}
prune_lru_cache(
&mut cache,
self.session_transcript_path_cache_max_entries,
|entry| entry.last_seen_ms,
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{ServiceConfig, ServiceConfigManager, UpstreamAuth, UpstreamConfig};
use crate::runtime_identity::ProviderEndpointKey;
fn test_runtime_policy(
session_override_ttl_ms: u64,
session_binding_ttl_ms: u64,
session_binding_max_entries: usize,
) -> RuntimePolicy {
RuntimePolicy {
session_override_ttl_ms,
session_binding_ttl_ms,
session_binding_max_entries,
session_route_affinity_ttl_ms: 0,
session_route_affinity_max_entries: 5_000,
session_cwd_cache_ttl_ms: 0,
session_cwd_cache_max_entries: 0,
session_transcript_path_cache_ttl_ms: 30_000,
session_transcript_path_cache_max_entries: 5_000,
}
}
#[test]
fn begin_and_finish_requests_keep_trace_id() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
let request_id = state
.begin_request(
"codex",
"POST",
"/v1/responses",
None,
None,
None,
None,
Some("gpt-5".to_string()),
None,
Some("priority".to_string()),
100,
)
.await;
let active = state.list_active_requests().await;
assert_eq!(active[0].trace_id.as_deref(), Some("codex-1"));
state
.finish_request(FinishRequestParams {
id: request_id,
status_code: 200,
duration_ms: 10,
ended_at_ms: 110,
observed_service_tier: Some("priority".to_string()),
usage: None,
retry: None,
ttfb_ms: Some(4),
streaming: false,
})
.await;
let recent = state.list_recent_finished(1).await;
assert_eq!(recent[0].trace_id.as_deref(), Some("codex-1"));
assert_eq!(recent[0].observability.trace_id.as_deref(), Some("codex-1"));
assert!(recent[0].observability.fast_mode);
assert_eq!(recent[0].observability.generation_ms, Some(6));
});
}
#[test]
fn finish_request_estimates_cost_and_rolls_up_cost() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
let request_id = state
.begin_request(
"codex",
"POST",
"/v1/responses",
None,
None,
None,
None,
Some("gpt-5".to_string()),
None,
None,
100,
)
.await;
state
.finish_request(FinishRequestParams {
id: request_id,
status_code: 200,
duration_ms: 10,
ended_at_ms: 110,
observed_service_tier: None,
usage: Some(UsageMetrics {
input_tokens: 1_000,
output_tokens: 500,
cached_input_tokens: 100,
total_tokens: 1_500,
..UsageMetrics::default()
}),
retry: None,
ttfb_ms: Some(4),
streaming: false,
})
.await;
let recent = state.list_recent_finished(1).await;
assert_eq!(recent[0].cost.total_cost_usd.as_deref(), Some("0.0061375"));
let rollup = state.get_usage_rollup_view("codex", 12, 1).await;
assert_eq!(
rollup.loaded.cost.total_cost_usd.as_deref(),
Some("0.0061375")
);
assert_eq!(rollup.loaded.cost.priced_requests, 1);
assert_eq!(rollup.loaded.cost.unpriced_requests, 0);
});
}
#[test]
fn usage_rollup_view_scores_entities_inside_selected_window() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let old_ms = now_ms.saturating_sub(10 * 86_400_000);
let old_id = state
.begin_request(
"codex",
"POST",
"/v1/responses",
None,
None,
None,
None,
Some("gpt-5".to_string()),
None,
None,
old_ms.saturating_sub(1_000),
)
.await;
state
.update_request_route(
old_id,
Some("old-station".to_string()),
Some("old-provider".to_string()),
"https://old.example/v1".to_string(),
None,
)
.await;
state
.finish_request(FinishRequestParams {
id: old_id,
status_code: 200,
duration_ms: 20,
ended_at_ms: old_ms,
observed_service_tier: None,
usage: Some(UsageMetrics {
total_tokens: 100_000,
..UsageMetrics::default()
}),
retry: None,
ttfb_ms: Some(5),
streaming: false,
})
.await;
let fresh_id = state
.begin_request(
"codex",
"POST",
"/v1/responses",
None,
None,
None,
None,
Some("gpt-5".to_string()),
None,
None,
now_ms.saturating_sub(1_000),
)
.await;
state
.update_request_route(
fresh_id,
Some("fresh-station".to_string()),
Some("fresh-provider".to_string()),
"https://fresh.example/v1".to_string(),
None,
)
.await;
state
.finish_request(FinishRequestParams {
id: fresh_id,
status_code: 200,
duration_ms: 10,
ended_at_ms: now_ms,
observed_service_tier: None,
usage: Some(UsageMetrics {
total_tokens: 10,
..UsageMetrics::default()
}),
retry: None,
ttfb_ms: Some(3),
streaming: false,
})
.await;
let week = state.get_usage_rollup_view("codex", 10, 7).await;
assert_eq!(week.loaded.requests_total, 2);
assert_eq!(week.window.requests_total, 1);
assert_eq!(week.by_day.len(), 7);
assert_eq!(week.by_config[0].0, "fresh-station");
assert_eq!(week.by_provider[0].0, "fresh-provider");
let loaded = state.get_usage_rollup_view("codex", 10, 0).await;
assert_eq!(loaded.window.requests_total, 2);
assert_eq!(loaded.by_config[0].0, "old-station");
assert_eq!(loaded.by_provider[0].0, "old-provider");
});
}
#[test]
fn build_session_identity_cards_merges_sources_and_sorts_newest_first() {
let active = vec![ActiveRequest {
id: 1,
trace_id: Some("codex-1".to_string()),
session_id: Some("sid-active".to_string()),
client_name: Some("Frank-Laptop".to_string()),
client_addr: Some("100.64.0.8".to_string()),
cwd: Some("G:/codes/project".to_string()),
model: Some("gpt-5.4".to_string()),
reasoning_effort: Some("medium".to_string()),
service_tier: Some("priority".to_string()),
station_name: Some("right".to_string()),
provider_id: Some("right".to_string()),
upstream_base_url: Some("https://right.example/v1".to_string()),
route_decision: None,
service: "codex".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
started_at_ms: 500,
}];
let recent = vec![
FinishedRequest {
id: 2,
trace_id: Some("codex-2".to_string()),
session_id: Some("sid-recent".to_string()),
client_name: Some("Studio-Mini".to_string()),
client_addr: Some("100.64.0.9".to_string()),
cwd: Some("G:/codes/other".to_string()),
model: Some("gpt-5.3".to_string()),
reasoning_effort: Some("high".to_string()),
service_tier: Some("default".to_string()),
station_name: Some("vibe".to_string()),
provider_id: Some("vibe".to_string()),
upstream_base_url: Some("https://vibe.example/v1".to_string()),
route_decision: None,
usage: Some(UsageMetrics {
input_tokens: 1,
output_tokens: 2,
reasoning_tokens: 3,
total_tokens: 6,
..UsageMetrics::default()
}),
cost: CostBreakdown::default(),
retry: None,
observability: RequestObservability::default(),
service: "codex".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
status_code: 200,
duration_ms: 1200,
ttfb_ms: Some(100),
streaming: false,
ended_at_ms: 2_000,
},
FinishedRequest {
id: 3,
trace_id: Some("codex-3".to_string()),
session_id: Some("sid-active".to_string()),
client_name: Some("Frank-Laptop".to_string()),
client_addr: Some("100.64.0.8".to_string()),
cwd: Some("G:/codes/project".to_string()),
model: Some("gpt-5.4".to_string()),
reasoning_effort: Some("low".to_string()),
service_tier: Some("flex".to_string()),
station_name: Some("right".to_string()),
provider_id: Some("right".to_string()),
upstream_base_url: Some("https://right.example/v1".to_string()),
route_decision: None,
usage: None,
cost: CostBreakdown::default(),
retry: None,
observability: RequestObservability::default(),
service: "codex".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
status_code: 429,
duration_ms: 900,
ttfb_ms: None,
streaming: false,
ended_at_ms: 1_000,
},
];
let overrides = HashMap::from([("sid-active".to_string(), "xhigh".to_string())]);
let config_overrides = HashMap::from([("sid-active".to_string(), "temp".to_string())]);
let model_overrides =
HashMap::from([("sid-active".to_string(), "gpt-5.4-mini".to_string())]);
let service_tier_overrides =
HashMap::from([("sid-active".to_string(), "priority".to_string())]);
let stats = HashMap::from([(
"sid-active".to_string(),
SessionStats {
turns_total: 3,
last_client_name: Some("Frank-Laptop".to_string()),
last_client_addr: Some("100.64.0.8".to_string()),
last_model: Some("gpt-5.4".to_string()),
last_reasoning_effort: Some("low".to_string()),
last_service_tier: Some("flex".to_string()),
last_provider_id: Some("right".to_string()),
last_station_name: Some("right".to_string()),
last_route_decision: None,
last_usage: None,
total_usage: UsageMetrics {
input_tokens: 10,
output_tokens: 20,
reasoning_tokens: 5,
total_tokens: 35,
..UsageMetrics::default()
},
turns_with_usage: 2,
last_status: Some(429),
last_duration_ms: Some(900),
last_ended_at_ms: Some(1_000),
last_seen_ms: 1_000,
},
)]);
let cards = build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
active: &active,
recent: &recent,
overrides: &overrides,
station_overrides: &config_overrides,
model_overrides: &model_overrides,
service_tier_overrides: &service_tier_overrides,
bindings: &HashMap::new(),
route_affinities: &HashMap::new(),
global_station_override: None,
stats: &stats,
});
assert_eq!(cards.len(), 2);
assert_eq!(cards[0].session_id.as_deref(), Some("sid-recent"));
assert_eq!(
cards[0].observation_scope,
SessionObservationScope::HostLocalEnriched
);
assert_eq!(cards[0].last_client_name.as_deref(), Some("Studio-Mini"));
assert_eq!(cards[0].last_client_addr.as_deref(), Some("100.64.0.9"));
assert_eq!(cards[1].session_id.as_deref(), Some("sid-active"));
assert_eq!(
cards[1].observation_scope,
SessionObservationScope::HostLocalEnriched
);
assert_eq!(cards[1].active_count, 1);
assert_eq!(cards[1].last_client_name.as_deref(), Some("Frank-Laptop"));
assert_eq!(cards[1].last_client_addr.as_deref(), Some("100.64.0.8"));
assert_eq!(cards[1].last_status, Some(429));
assert_eq!(cards[1].override_effort.as_deref(), Some("xhigh"));
assert_eq!(cards[1].override_station_name.as_deref(), Some("temp"));
assert_eq!(cards[1].override_model.as_deref(), Some("gpt-5.4-mini"));
assert_eq!(cards[1].override_service_tier.as_deref(), Some("priority"));
assert_eq!(
cards[1]
.effective_model
.as_ref()
.map(|value| value.value.as_str()),
Some("gpt-5.4-mini")
);
assert_eq!(
cards[1].effective_model.as_ref().map(|value| value.source),
Some(RouteValueSource::SessionOverride)
);
assert_eq!(
cards[1]
.effective_reasoning_effort
.as_ref()
.map(|value| value.source),
Some(RouteValueSource::SessionOverride)
);
assert_eq!(
cards[1]
.effective_service_tier
.as_ref()
.map(|value| value.source),
Some(RouteValueSource::SessionOverride)
);
assert_eq!(
cards[1]
.effective_station
.as_ref()
.map(|value| value.source),
Some(RouteValueSource::SessionOverride)
);
assert!(cards[1].effective_upstream_base_url.is_none());
assert_eq!(
cards[1].last_upstream_base_url.as_deref(),
Some("https://right.example/v1")
);
assert_eq!(cards[1].turns_total, Some(3));
assert_eq!(cards[1].last_service_tier.as_deref(), Some("flex"));
assert_eq!(
cards[1].total_usage.as_ref().map(|u| u.total_tokens),
Some(35)
);
}
#[test]
fn build_session_identity_cards_prefers_binding_defaults_for_effective_route() {
let active = vec![ActiveRequest {
id: 1,
trace_id: Some("codex-1".to_string()),
session_id: Some("sid-bound".to_string()),
client_name: Some("Workstation".to_string()),
client_addr: Some("100.64.0.10".to_string()),
cwd: None,
model: Some("gpt-observed".to_string()),
reasoning_effort: Some("medium".to_string()),
service_tier: Some("default".to_string()),
station_name: Some("right".to_string()),
provider_id: None,
upstream_base_url: None,
route_decision: None,
service: "codex".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
started_at_ms: 10,
}];
let bindings = HashMap::from([(
"sid-bound".to_string(),
SessionBinding {
session_id: "sid-bound".to_string(),
profile_name: Some("daily".to_string()),
station_name: Some("vibe".to_string()),
model: Some("gpt-bound".to_string()),
reasoning_effort: Some("high".to_string()),
service_tier: Some("priority".to_string()),
continuity_mode: SessionContinuityMode::DefaultProfile,
created_at_ms: 1,
updated_at_ms: 1,
last_seen_ms: 10,
},
)]);
let cards = build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
active: &active,
recent: &[],
overrides: &HashMap::new(),
station_overrides: &HashMap::new(),
model_overrides: &HashMap::new(),
service_tier_overrides: &HashMap::new(),
bindings: &bindings,
route_affinities: &HashMap::new(),
global_station_override: None,
stats: &HashMap::new(),
});
assert_eq!(cards[0].binding_profile_name.as_deref(), Some("daily"));
assert_eq!(
cards[0].observation_scope,
SessionObservationScope::ObservedOnly
);
assert_eq!(
cards[0].binding_continuity_mode,
Some(SessionContinuityMode::DefaultProfile)
);
assert_eq!(
cards[0]
.effective_model
.as_ref()
.map(|value| (value.value.as_str(), value.source)),
Some(("gpt-bound", RouteValueSource::ProfileDefault))
);
assert_eq!(
cards[0]
.effective_reasoning_effort
.as_ref()
.map(|value| (value.value.as_str(), value.source)),
Some(("high", RouteValueSource::ProfileDefault))
);
assert_eq!(
cards[0]
.effective_service_tier
.as_ref()
.map(|value| (value.value.as_str(), value.source)),
Some(("priority", RouteValueSource::ProfileDefault))
);
assert_eq!(
cards[0]
.effective_station
.as_ref()
.map(|value| (value.value.as_str(), value.source)),
Some(("vibe", RouteValueSource::ProfileDefault))
);
}
#[test]
fn build_session_identity_cards_keeps_binding_values_but_allows_global_config_override() {
let active = vec![ActiveRequest {
id: 1,
trace_id: Some("codex-1".to_string()),
session_id: Some("sid-bound".to_string()),
client_name: Some("Workstation".to_string()),
client_addr: Some("100.64.0.10".to_string()),
cwd: None,
model: Some("gpt-observed".to_string()),
reasoning_effort: Some("medium".to_string()),
service_tier: Some("default".to_string()),
station_name: Some("vibe".to_string()),
provider_id: None,
upstream_base_url: Some("https://vibe.example/v1".to_string()),
route_decision: None,
service: "codex".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
started_at_ms: 10,
}];
let bindings = HashMap::from([(
"sid-bound".to_string(),
SessionBinding {
session_id: "sid-bound".to_string(),
profile_name: Some("daily".to_string()),
station_name: Some("vibe".to_string()),
model: Some("gpt-bound".to_string()),
reasoning_effort: Some("high".to_string()),
service_tier: Some("priority".to_string()),
continuity_mode: SessionContinuityMode::DefaultProfile,
created_at_ms: 1,
updated_at_ms: 1,
last_seen_ms: 10,
},
)]);
let cards = build_session_identity_cards_from_parts(SessionIdentityCardBuildInputs {
active: &active,
recent: &[],
overrides: &HashMap::new(),
station_overrides: &HashMap::new(),
model_overrides: &HashMap::new(),
service_tier_overrides: &HashMap::new(),
bindings: &bindings,
route_affinities: &HashMap::new(),
global_station_override: Some("right"),
stats: &HashMap::new(),
});
assert_eq!(cards[0].binding_profile_name.as_deref(), Some("daily"));
assert_eq!(
cards[0]
.effective_model
.as_ref()
.map(|value| (value.value.as_str(), value.source)),
Some(("gpt-bound", RouteValueSource::ProfileDefault))
);
assert_eq!(
cards[0]
.effective_reasoning_effort
.as_ref()
.map(|value| (value.value.as_str(), value.source)),
Some(("high", RouteValueSource::ProfileDefault))
);
assert_eq!(
cards[0]
.effective_service_tier
.as_ref()
.map(|value| (value.value.as_str(), value.source)),
Some(("priority", RouteValueSource::ProfileDefault))
);
assert_eq!(
cards[0]
.effective_station
.as_ref()
.map(|value| (value.value.as_str(), value.source)),
Some(("right", RouteValueSource::GlobalOverride))
);
assert!(cards[0].effective_upstream_base_url.is_none());
}
#[test]
fn enrich_session_identity_cards_with_runtime_applies_station_mapping_and_single_upstream() {
let mut cards = vec![SessionIdentityCard {
session_id: Some("sid-1".to_string()),
last_model: Some("gpt-5.4".to_string()),
last_station_name: Some("right".to_string()),
last_upstream_base_url: Some("https://right.example/v1".to_string()),
effective_model: Some(ResolvedRouteValue::new(
"gpt-5.4",
RouteValueSource::RequestPayload,
)),
effective_station: Some(ResolvedRouteValue::new(
"right",
RouteValueSource::RuntimeFallback,
)),
..SessionIdentityCard::default()
}];
let mut mgr = ServiceConfigManager {
active: Some("right".to_string()),
..ServiceConfigManager::default()
};
mgr.configs.insert(
"right".to_string(),
ServiceConfig {
name: "right".to_string(),
alias: None,
enabled: true,
level: 1,
upstreams: vec![UpstreamConfig {
base_url: "https://right.example/v1".to_string(),
auth: UpstreamAuth::default(),
tags: HashMap::new(),
supported_models: HashMap::new(),
model_mapping: HashMap::from([(
"gpt-5.4".to_string(),
"gpt-5.4-fast".to_string(),
)]),
}],
},
);
enrich_session_identity_cards_with_runtime(&mut cards, &mgr);
assert_eq!(
cards[0]
.effective_model
.as_ref()
.map(|value| value.value.as_str()),
Some("gpt-5.4-fast")
);
assert_eq!(
cards[0].effective_model.as_ref().map(|value| value.source),
Some(RouteValueSource::StationMapping)
);
assert_eq!(
cards[0]
.effective_upstream_base_url
.as_ref()
.map(|value| value.value.as_str()),
Some("https://right.example/v1")
);
}
#[test]
fn apply_session_profile_binding_sets_binding_and_clears_manual_overrides() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
let now_ms = 42;
let mut mgr = ServiceConfigManager::default();
mgr.configs.insert(
"right".to_string(),
ServiceConfig {
name: "right".to_string(),
alias: None,
enabled: true,
level: 1,
upstreams: vec![UpstreamConfig {
base_url: "https://right.example/v1".to_string(),
auth: UpstreamAuth::default(),
tags: HashMap::from([
("supports_reasoning_effort".to_string(), "true".to_string()),
("supports_service_tier".to_string(), "true".to_string()),
]),
supported_models: HashMap::from([("gpt-5.4".to_string(), true)]),
model_mapping: HashMap::new(),
}],
},
);
mgr.profiles.insert(
"fast".to_string(),
crate::config::ServiceControlProfile {
extends: None,
station: Some("right".to_string()),
model: Some("gpt-5.4".to_string()),
reasoning_effort: Some("low".to_string()),
service_tier: Some("flex".to_string()),
},
);
state
.set_session_station_override("sid-1".to_string(), "other".to_string(), 1)
.await;
state
.set_session_model_override("sid-1".to_string(), "gpt-x".to_string(), 1)
.await;
state
.set_session_effort_override("sid-1".to_string(), "high".to_string(), 1)
.await;
state
.set_session_service_tier_override("sid-1".to_string(), "priority".to_string(), 1)
.await;
state
.apply_session_profile_binding(
"codex",
&mgr,
"sid-1".to_string(),
"fast".to_string(),
now_ms,
)
.await
.expect("apply profile");
let binding = state
.get_session_binding("sid-1")
.await
.expect("binding exists");
assert_eq!(binding.profile_name.as_deref(), Some("fast"));
assert_eq!(binding.station_name.as_deref(), Some("right"));
assert_eq!(binding.model.as_deref(), Some("gpt-5.4"));
assert_eq!(binding.reasoning_effort.as_deref(), Some("low"));
assert_eq!(binding.service_tier.as_deref(), Some("flex"));
assert_eq!(
binding.continuity_mode,
SessionContinuityMode::ManualProfile
);
assert_eq!(binding.updated_at_ms, now_ms);
assert!(state.get_session_station_override("sid-1").await.is_none());
assert!(state.get_session_model_override("sid-1").await.is_none());
assert!(state.get_session_effort_override("sid-1").await.is_none());
assert!(
state
.get_session_service_tier_override("sid-1")
.await
.is_none()
);
});
}
#[test]
fn list_session_manual_overrides_merges_all_dimensions() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
state
.set_session_reasoning_effort_override("sid-1".to_string(), "high".to_string(), 1)
.await;
state
.set_session_station_override("sid-1".to_string(), "right".to_string(), 1)
.await;
state
.set_session_model_override("sid-1".to_string(), "gpt-5.4".to_string(), 1)
.await;
state
.set_session_service_tier_override("sid-1".to_string(), "priority".to_string(), 1)
.await;
state
.set_session_model_override("sid-2".to_string(), "gpt-5.4-mini".to_string(), 2)
.await;
let merged = state.list_session_manual_overrides().await;
assert_eq!(merged.len(), 2);
assert_eq!(
merged
.get("sid-1")
.and_then(|overrides| overrides.reasoning_effort.as_deref()),
Some("high")
);
assert_eq!(
merged
.get("sid-1")
.and_then(|overrides| overrides.station_name.as_deref()),
Some("right")
);
assert_eq!(
merged
.get("sid-1")
.and_then(|overrides| overrides.model.as_deref()),
Some("gpt-5.4")
);
assert_eq!(
merged
.get("sid-1")
.and_then(|overrides| overrides.service_tier.as_deref()),
Some("priority")
);
assert_eq!(
merged
.get("sid-2")
.and_then(|overrides| overrides.model.as_deref()),
Some("gpt-5.4-mini")
);
assert!(
merged
.get("sid-2")
.is_some_and(|overrides| overrides.reasoning_effort.is_none())
);
});
}
#[test]
fn provider_balance_snapshots_are_recorded_and_refreshed() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
state
.record_provider_balance_snapshot(
"codex",
ProviderBalanceSnapshot {
provider_id: "packycode".to_string(),
station_name: Some("right".to_string()),
upstream_index: Some(2),
source: "usage_provider:budget_http_json".to_string(),
fetched_at_ms: 10,
stale_after_ms: Some(0),
stale: false,
status: BalanceSnapshotStatus::Unknown,
exhausted: Some(false),
exhaustion_affects_routing: true,
plan_name: None,
total_balance_usd: Some("3.5".to_string()),
subscription_balance_usd: None,
paygo_balance_usd: None,
monthly_budget_usd: Some("5".to_string()),
monthly_spent_usd: Some("1.5".to_string()),
quota_period: None,
quota_remaining_usd: None,
quota_limit_usd: None,
quota_used_usd: None,
unlimited_quota: None,
total_used_usd: None,
today_used_usd: None,
total_requests: None,
today_requests: None,
total_tokens: None,
today_tokens: None,
error: None,
},
)
.await;
let view = state.get_provider_balance_view("codex").await;
let balances = view.get("right").expect("station balance");
assert_eq!(balances.len(), 1);
assert_eq!(balances[0].provider_id, "packycode");
assert_eq!(balances[0].status, BalanceSnapshotStatus::Stale);
assert_eq!(balances[0].exhausted, Some(false));
let summary = state
.get_provider_balance_summary_view("codex")
.await
.get("right")
.cloned()
.expect("station balance summary");
assert_eq!(summary.snapshots, 1);
assert_eq!(summary.stale, 1);
assert_eq!(summary.routing_snapshots, 1);
});
}
#[test]
fn provider_balance_snapshots_keep_multiple_providers_per_upstream() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
for (provider_id, status) in [
("general", BalanceSnapshotStatus::Error),
("newapi", BalanceSnapshotStatus::Ok),
] {
state
.record_provider_balance_snapshot(
"codex",
ProviderBalanceSnapshot {
provider_id: provider_id.to_string(),
station_name: Some("routing".to_string()),
upstream_index: Some(1),
source: "usage_provider:test".to_string(),
fetched_at_ms: 10,
stale_after_ms: None,
stale: false,
status,
exhausted: if status == BalanceSnapshotStatus::Ok {
Some(false)
} else {
None
},
exhaustion_affects_routing: true,
plan_name: None,
total_balance_usd: if status == BalanceSnapshotStatus::Ok {
Some("3.5".to_string())
} else {
None
},
subscription_balance_usd: None,
paygo_balance_usd: None,
monthly_budget_usd: None,
monthly_spent_usd: None,
quota_period: None,
quota_remaining_usd: None,
quota_limit_usd: None,
quota_used_usd: None,
unlimited_quota: None,
total_used_usd: None,
today_used_usd: None,
total_requests: None,
today_requests: None,
total_tokens: None,
today_tokens: None,
error: if status == BalanceSnapshotStatus::Error {
Some("decode failed".to_string())
} else {
None
},
},
)
.await;
}
let view = state.get_provider_balance_view("codex").await;
let balances = view.get("routing").expect("station balance");
assert_eq!(balances.len(), 2);
assert_eq!(
balances
.iter()
.map(|snapshot| snapshot.provider_id.as_str())
.collect::<Vec<_>>(),
vec!["general", "newapi"]
);
});
}
#[test]
fn prune_runtime_observability_keeps_catalog_provider_balances_on_routing_layout_change() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
let mut initial_mgr = ServiceConfigManager::default();
initial_mgr.configs.insert(
"routing".to_string(),
ServiceConfig {
name: "routing".to_string(),
alias: None,
enabled: true,
level: 1,
upstreams: vec![
UpstreamConfig {
base_url: "https://input.example/v1".to_string(),
auth: UpstreamAuth::default(),
tags: HashMap::from([("provider_id".to_string(), "input".to_string())]),
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
UpstreamConfig {
base_url: "https://backup.example/v1".to_string(),
auth: UpstreamAuth::default(),
tags: HashMap::from([(
"provider_id".to_string(),
"backup".to_string(),
)]),
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
],
},
);
state
.prune_runtime_observability_for_service("codex", &initial_mgr)
.await;
state
.record_provider_balance_snapshot(
"codex",
ProviderBalanceSnapshot {
provider_id: "input".to_string(),
station_name: Some("input".to_string()),
upstream_index: Some(0),
source: "usage_provider:test".to_string(),
fetched_at_ms: 10,
stale_after_ms: None,
stale: false,
status: BalanceSnapshotStatus::Ok,
exhausted: Some(false),
exhaustion_affects_routing: true,
total_balance_usd: Some("3.5".to_string()),
..ProviderBalanceSnapshot::default()
},
)
.await;
state
.record_provider_balance_snapshot(
"codex",
ProviderBalanceSnapshot {
provider_id: "input".to_string(),
station_name: Some("routing".to_string()),
upstream_index: Some(0),
source: "usage_provider:test".to_string(),
fetched_at_ms: 10,
stale_after_ms: None,
stale: false,
status: BalanceSnapshotStatus::Ok,
exhausted: Some(false),
exhaustion_affects_routing: true,
total_balance_usd: Some("3.5".to_string()),
..ProviderBalanceSnapshot::default()
},
)
.await;
let mut pinned_mgr = ServiceConfigManager::default();
pinned_mgr.configs.insert(
"routing".to_string(),
ServiceConfig {
name: "routing".to_string(),
alias: None,
enabled: true,
level: 1,
upstreams: vec![UpstreamConfig {
base_url: "https://input.example/v1".to_string(),
auth: UpstreamAuth::default(),
tags: HashMap::from([("provider_id".to_string(), "input".to_string())]),
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
}],
},
);
state
.prune_runtime_observability_for_service("codex", &pinned_mgr)
.await;
let view = state.get_provider_balance_view("codex").await;
assert!(view.contains_key("input"));
assert!(!view.contains_key("routing"));
});
}
#[test]
fn prune_runtime_observability_removes_stale_service_keys() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
state
.set_station_enabled_override("codex", "old".to_string(), false, 1)
.await;
state
.set_upstream_enabled_override(
"codex",
"https://old.example/v1".to_string(),
false,
1,
)
.await;
state
.set_provider_endpoint_enabled_override(
"codex",
ProviderEndpointKey::new("codex", "provider-old", "default"),
false,
1,
)
.await;
state
.set_provider_endpoint_runtime_state_override(
"codex",
ProviderEndpointKey::new("codex", "provider-old", "default"),
RuntimeConfigState::BreakerOpen,
1,
)
.await;
state
.record_station_health(
"codex",
"old".to_string(),
StationHealth {
checked_at_ms: 10,
upstreams: vec![UpstreamHealth {
base_url: "https://old.example/v1".to_string(),
ok: Some(false),
status_code: Some(500),
latency_ms: Some(10),
error: Some("boom".to_string()),
passive: None,
}],
},
)
.await;
state
.record_passive_upstream_failure(PassiveUpstreamFailureRecord {
service_name: "codex".to_string(),
station_name: "old".to_string(),
base_url: "https://old.example/v1".to_string(),
status_code: Some(500),
error_class: Some("upstream_transport_error".to_string()),
error: Some("boom".to_string()),
now_ms: 20,
})
.await;
state
.record_provider_balance_snapshot(
"codex",
ProviderBalanceSnapshot {
provider_id: "provider-old".to_string(),
station_name: Some("old".to_string()),
upstream_index: Some(0),
source: "usage_provider:budget_http_json".to_string(),
fetched_at_ms: 10,
stale_after_ms: None,
stale: false,
status: BalanceSnapshotStatus::Ok,
exhausted: Some(false),
exhaustion_affects_routing: true,
plan_name: None,
total_balance_usd: Some("3.5".to_string()),
subscription_balance_usd: None,
paygo_balance_usd: None,
monthly_budget_usd: None,
monthly_spent_usd: None,
quota_period: None,
quota_remaining_usd: None,
quota_limit_usd: None,
quota_used_usd: None,
unlimited_quota: None,
total_used_usd: None,
today_used_usd: None,
total_requests: None,
today_requests: None,
total_tokens: None,
today_tokens: None,
error: None,
},
)
.await;
let request_id = state
.begin_request(
"codex",
"POST",
"/v1/responses",
Some("sid-old".to_string()),
None,
None,
None,
Some("gpt-5".to_string()),
None,
None,
30,
)
.await;
state
.update_request_route(
request_id,
Some("old".to_string()),
Some("provider-old".to_string()),
"https://old.example/v1".to_string(),
None,
)
.await;
state
.finish_request(FinishRequestParams {
id: request_id,
status_code: 200,
duration_ms: 5,
ended_at_ms: 35,
observed_service_tier: None,
usage: None,
retry: None,
ttfb_ms: None,
streaming: false,
})
.await;
let mut mgr = ServiceConfigManager::default();
mgr.configs.insert(
"new".to_string(),
ServiceConfig {
name: "new".to_string(),
alias: None,
enabled: true,
level: 1,
upstreams: vec![UpstreamConfig {
base_url: "https://new.example/v1".to_string(),
auth: UpstreamAuth::default(),
tags: HashMap::from([(
"provider_id".to_string(),
"provider-new".to_string(),
)]),
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
}],
},
);
state
.prune_runtime_observability_for_service("codex", &mgr)
.await;
assert!(state.get_station_meta_overrides("codex").await.is_empty());
assert!(state.get_upstream_meta_overrides("codex").await.is_empty());
assert!(state.get_station_health("codex").await.is_empty());
assert!(state.get_provider_balance_view("codex").await.is_empty());
let rollup = state.get_usage_rollup_view("codex", 10, 10).await;
assert!(rollup.by_config.is_empty());
assert!(rollup.by_provider.is_empty());
});
}
#[test]
fn get_upstream_meta_overrides_merges_endpoint_and_legacy_base_url_entries() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
state
.set_provider_endpoint_enabled_override(
"codex",
ProviderEndpointKey::new("codex", "alpha", "default"),
false,
1,
)
.await;
state
.set_provider_endpoint_runtime_state_override(
"codex",
ProviderEndpointKey::new("codex", "alpha", "default"),
RuntimeConfigState::BreakerOpen,
2,
)
.await;
state
.set_upstream_enabled_override(
"codex",
"https://legacy.example/v1".to_string(),
true,
3,
)
.await;
state
.set_upstream_runtime_state_override(
"codex",
"https://legacy.example/v1".to_string(),
RuntimeConfigState::Draining,
4,
)
.await;
let overrides = state.get_upstream_meta_overrides("codex").await;
assert_eq!(
overrides.get("codex/alpha/default"),
Some(&(Some(false), Some(RuntimeConfigState::BreakerOpen)))
);
assert_eq!(
overrides.get("https://legacy.example/v1"),
Some(&(Some(true), Some(RuntimeConfigState::Draining)))
);
});
}
#[test]
fn provider_endpoint_runtime_health_is_keyed_by_provider_endpoint() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
let monthly = ProviderEndpointKey::new("codex", "monthly", "default");
let fallback = ProviderEndpointKey::new("codex", "fallback", "default");
state
.record_provider_endpoint_attempt_success("codex", fallback.clone(), 10)
.await;
state
.set_provider_endpoint_usage_exhausted("codex", monthly.clone(), true)
.await;
state
.record_provider_endpoint_attempt_failure(
"codex",
monthly.clone(),
0,
crate::lb::CooldownBackoff {
factor: 1,
max_secs: 0,
},
)
.await;
state
.record_provider_endpoint_attempt_failure(
"codex",
monthly.clone(),
0,
crate::lb::CooldownBackoff {
factor: 1,
max_secs: 0,
},
)
.await;
state
.record_provider_endpoint_attempt_failure(
"codex",
monthly.clone(),
30,
crate::lb::CooldownBackoff {
factor: 1,
max_secs: 0,
},
)
.await;
let runtime = state
.route_plan_runtime_state_for_provider_endpoints("codex")
.await;
let monthly_state = runtime.provider_endpoint(&monthly);
assert_eq!(monthly_state.failure_count, crate::lb::FAILURE_THRESHOLD);
assert!(monthly_state.cooldown_active);
assert!(monthly_state.usage_exhausted);
assert_eq!(runtime.affinity_provider_endpoint(), Some(&fallback));
state
.set_provider_endpoint_runtime_state_override(
"codex",
fallback.clone(),
RuntimeConfigState::BreakerOpen,
20,
)
.await;
let runtime = state
.route_plan_runtime_state_for_provider_endpoints("codex")
.await;
assert!(runtime.provider_endpoint(&fallback).runtime_disabled);
});
}
#[test]
fn get_station_health_merges_passive_runtime_observations() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
state
.record_station_health(
"codex",
"right".to_string(),
StationHealth {
checked_at_ms: 10,
upstreams: vec![UpstreamHealth {
base_url: "https://right.example/v1".to_string(),
ok: Some(true),
status_code: Some(200),
latency_ms: Some(120),
error: None,
passive: None,
}],
},
)
.await;
state
.record_passive_upstream_failure(PassiveUpstreamFailureRecord {
service_name: "codex".to_string(),
station_name: "right".to_string(),
base_url: "https://right.example/v1".to_string(),
status_code: Some(500),
error_class: Some("cloudflare_timeout".to_string()),
error: Some("upstream timed out".to_string()),
now_ms: 20,
})
.await;
state
.record_passive_upstream_success(
"codex",
"right",
"https://backup.example/v1",
Some(200),
30,
)
.await;
let health = state.get_station_health("codex").await;
let right = health.get("right").expect("right health");
assert_eq!(right.checked_at_ms, 30);
assert_eq!(right.upstreams.len(), 2);
let primary = right
.upstreams
.iter()
.find(|upstream| upstream.base_url == "https://right.example/v1")
.expect("primary upstream");
assert_eq!(primary.ok, Some(true));
let primary_passive = primary.passive.as_ref().expect("primary passive");
assert_eq!(primary_passive.state, PassiveHealthState::Degraded);
assert_eq!(primary_passive.score, 50);
assert_eq!(primary_passive.last_status_code, Some(500));
assert_eq!(
primary_passive.last_error_class.as_deref(),
Some("cloudflare_timeout")
);
let backup = right
.upstreams
.iter()
.find(|upstream| upstream.base_url == "https://backup.example/v1")
.expect("backup upstream");
assert_eq!(backup.ok, None);
let backup_passive = backup.passive.as_ref().expect("backup passive");
assert_eq!(backup_passive.state, PassiveHealthState::Healthy);
assert_eq!(backup_passive.score, 100);
assert_eq!(backup_passive.last_status_code, Some(200));
});
}
#[test]
fn passive_health_success_recovers_after_failure() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
state
.record_passive_upstream_failure(PassiveUpstreamFailureRecord {
service_name: "codex".to_string(),
station_name: "right".to_string(),
base_url: "https://right.example/v1".to_string(),
status_code: Some(500),
error_class: Some("cloudflare_timeout".to_string()),
error: Some("upstream timed out".to_string()),
now_ms: 10,
})
.await;
state
.record_passive_upstream_success(
"codex",
"right",
"https://right.example/v1",
Some(200),
20,
)
.await;
let health = state.get_station_health("codex").await;
let right = health.get("right").expect("right health");
let upstream = right.upstreams.first().expect("upstream");
let passive = upstream.passive.as_ref().expect("passive");
assert_eq!(passive.state, PassiveHealthState::Healthy);
assert_eq!(passive.score, 100);
assert_eq!(passive.consecutive_failures, 0);
assert_eq!(passive.last_success_at_ms, Some(20));
assert_eq!(passive.last_failure_at_ms, Some(10));
assert_eq!(passive.last_error_class, None);
assert_eq!(passive.last_error, None);
});
}
#[test]
fn apply_session_profile_binding_uses_inherited_profile_values() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new();
let mut mgr = ServiceConfigManager::default();
mgr.configs.insert(
"right".to_string(),
ServiceConfig {
name: "right".to_string(),
alias: None,
enabled: true,
level: 1,
upstreams: vec![UpstreamConfig {
base_url: "https://right.example/v1".to_string(),
auth: UpstreamAuth::default(),
tags: HashMap::from([
("supports_reasoning_effort".to_string(), "true".to_string()),
("supports_service_tier".to_string(), "true".to_string()),
]),
supported_models: HashMap::from([("gpt-5.4".to_string(), true)]),
model_mapping: HashMap::new(),
}],
},
);
mgr.profiles.insert(
"base".to_string(),
crate::config::ServiceControlProfile {
extends: None,
station: Some("right".to_string()),
model: Some("gpt-5.4".to_string()),
reasoning_effort: None,
service_tier: Some("priority".to_string()),
},
);
mgr.profiles.insert(
"fast".to_string(),
crate::config::ServiceControlProfile {
extends: Some("base".to_string()),
station: None,
model: None,
reasoning_effort: Some("low".to_string()),
service_tier: None,
},
);
state
.apply_session_profile_binding(
"codex",
&mgr,
"sid-inherited".to_string(),
"fast".to_string(),
100,
)
.await
.expect("apply inherited profile");
let binding = state
.get_session_binding("sid-inherited")
.await
.expect("binding exists");
assert_eq!(binding.profile_name.as_deref(), Some("fast"));
assert_eq!(binding.station_name.as_deref(), Some("right"));
assert_eq!(binding.model.as_deref(), Some("gpt-5.4"));
assert_eq!(binding.reasoning_effort.as_deref(), Some("low"));
assert_eq!(binding.service_tier.as_deref(), Some("priority"));
});
}
#[test]
fn prune_periodic_keeps_sticky_binding_after_manual_override_ttl_expires() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(1, 0, 2_000));
state
.set_session_model_override("sid-sticky".to_string(), "gpt-5.4".to_string(), 0)
.await;
state
.set_session_binding(SessionBinding {
session_id: "sid-sticky".to_string(),
profile_name: Some("daily".to_string()),
station_name: Some("right".to_string()),
model: Some("gpt-5.4".to_string()),
reasoning_effort: Some("medium".to_string()),
service_tier: Some("default".to_string()),
continuity_mode: SessionContinuityMode::DefaultProfile,
created_at_ms: 0,
updated_at_ms: 0,
last_seen_ms: 0,
})
.await;
state.prune_periodic().await;
assert!(
state
.get_session_model_override("sid-sticky")
.await
.is_none()
);
assert!(state.get_session_binding("sid-sticky").await.is_some());
});
}
#[test]
fn prune_periodic_honors_opt_in_binding_ttl() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(0, 1, 2_000));
state
.set_session_binding(SessionBinding {
session_id: "sid-expire".to_string(),
profile_name: Some("daily".to_string()),
station_name: Some("right".to_string()),
model: Some("gpt-5.4".to_string()),
reasoning_effort: Some("medium".to_string()),
service_tier: Some("default".to_string()),
continuity_mode: SessionContinuityMode::DefaultProfile,
created_at_ms: 0,
updated_at_ms: 0,
last_seen_ms: 0,
})
.await;
state.prune_periodic().await;
assert!(state.get_session_binding("sid-expire").await.is_none());
});
}
#[test]
fn session_route_affinity_ttl_is_enforced_on_read() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let mut policy = test_runtime_policy(0, 0, 2_000);
policy.session_route_affinity_ttl_ms = 1;
let state = ProxyState::new_with_runtime_policy(None, policy);
state
.record_session_route_affinity_success(
"sid-expire",
SessionRouteAffinityTarget {
route_graph_key: "graph".to_string(),
provider_endpoint: ProviderEndpointKey::new("codex", "monthly", "default"),
upstream_base_url: "https://monthly.example/v1".to_string(),
route_path: vec!["monthly_first".to_string(), "monthly".to_string()],
},
Some("first_success".to_string()),
0,
)
.await;
assert!(
state
.get_session_route_affinity("sid-expire")
.await
.is_none()
);
assert!(state.list_session_route_affinities().await.is_empty());
});
}
#[test]
fn prune_periodic_caps_sticky_bindings_by_last_seen() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(0, 0, 2));
for idx in 0..3 {
state
.set_session_binding(SessionBinding {
session_id: format!("sid-{idx}"),
profile_name: Some("daily".to_string()),
station_name: Some("right".to_string()),
model: Some("gpt-5.4".to_string()),
reasoning_effort: Some("medium".to_string()),
service_tier: Some("default".to_string()),
continuity_mode: SessionContinuityMode::DefaultProfile,
created_at_ms: idx,
updated_at_ms: idx,
last_seen_ms: idx,
})
.await;
}
state.prune_periodic().await;
assert!(state.get_session_binding("sid-0").await.is_none());
assert!(state.get_session_binding("sid-1").await.is_some());
assert!(state.get_session_binding("sid-2").await.is_some());
});
}
#[test]
fn transcript_path_cache_records_negative_lookups() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let state = ProxyState::new_with_runtime_policy(None, test_runtime_policy(0, 0, 2_000));
let paths = state
.resolve_host_transcript_paths_cached(&["missing-session".to_string()])
.await;
assert_eq!(paths.get("missing-session"), Some(&None));
let cache = state.session_transcript_path_cache.read().await;
assert!(cache.contains_key("missing-session"));
});
}
}