Skip to main content

codex_helper_core/state/
session_identity.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::config::ServiceConfigManager;
6use crate::pricing::CostBreakdown;
7use crate::runtime_identity::ProviderEndpointKey;
8use crate::sessions;
9use crate::usage::{CacheInputAccounting, UsageMetrics};
10
11fn bool_is_false(value: &bool) -> bool {
12    !*value
13}
14
15fn service_tier_is_fast(value: Option<&str>) -> bool {
16    value
17        .map(str::trim)
18        .is_some_and(|tier| tier.eq_ignore_ascii_case("priority"))
19}
20
21fn generation_ms_from_duration(duration_ms: u64, ttfb_ms: Option<u64>) -> Option<u64> {
22    if duration_ms == 0 {
23        return None;
24    }
25    let ttfb_ms = ttfb_ms.unwrap_or(0);
26    if ttfb_ms > 0 && ttfb_ms < duration_ms {
27        Some(duration_ms.saturating_sub(ttfb_ms))
28    } else {
29        Some(duration_ms)
30    }
31}
32
33fn default_attempt_count() -> u32 {
34    1
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct RequestObservability {
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub trace_id: Option<String>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub duration_ms: Option<u64>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub ttfb_ms: Option<u64>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub generation_ms: Option<u64>,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub output_tokens_per_second: Option<f64>,
49    #[serde(default = "default_attempt_count")]
50    pub attempt_count: u32,
51    #[serde(default)]
52    pub route_attempt_count: usize,
53    #[serde(default, skip_serializing_if = "bool_is_false")]
54    pub retried: bool,
55    #[serde(default, skip_serializing_if = "bool_is_false")]
56    pub cross_station_failover: bool,
57    #[serde(default, skip_serializing_if = "bool_is_false")]
58    pub same_station_retry: bool,
59    #[serde(default, skip_serializing_if = "bool_is_false")]
60    pub fast_mode: bool,
61    #[serde(default, skip_serializing_if = "bool_is_false")]
62    pub streaming: bool,
63}
64
65impl Default for RequestObservability {
66    fn default() -> Self {
67        Self {
68            trace_id: None,
69            duration_ms: None,
70            ttfb_ms: None,
71            generation_ms: None,
72            output_tokens_per_second: None,
73            attempt_count: 1,
74            route_attempt_count: 0,
75            retried: false,
76            cross_station_failover: false,
77            same_station_retry: false,
78            fast_mode: false,
79            streaming: false,
80        }
81    }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85pub struct ActiveRequest {
86    pub id: u64,
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub trace_id: Option<String>,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub session_id: Option<String>,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub client_name: Option<String>,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub client_addr: Option<String>,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub cwd: Option<String>,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub model: Option<String>,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub reasoning_effort: Option<String>,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub service_tier: Option<String>,
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub station_name: Option<String>,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub provider_id: Option<String>,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub upstream_base_url: Option<String>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub route_decision: Option<RouteDecisionProvenance>,
111    pub service: String,
112    pub method: String,
113    pub path: String,
114    pub started_at_ms: u64,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct FinishedRequest {
119    pub id: u64,
120    #[serde(default, skip_serializing_if = "Option::is_none")]
121    pub trace_id: Option<String>,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub session_id: Option<String>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub client_name: Option<String>,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub client_addr: Option<String>,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    pub cwd: Option<String>,
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub model: Option<String>,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub reasoning_effort: Option<String>,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub service_tier: Option<String>,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub station_name: Option<String>,
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub provider_id: Option<String>,
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub upstream_base_url: Option<String>,
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub route_decision: Option<RouteDecisionProvenance>,
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub usage: Option<UsageMetrics>,
146    #[serde(default, skip_serializing_if = "CostBreakdown::is_unknown")]
147    pub cost: CostBreakdown,
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub retry: Option<crate::logging::RetryInfo>,
150    #[serde(default)]
151    pub observability: RequestObservability,
152    pub service: String,
153    pub method: String,
154    pub path: String,
155    pub status_code: u16,
156    pub duration_ms: u64,
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub ttfb_ms: Option<u64>,
159    #[serde(default, skip_serializing_if = "bool_is_false")]
160    pub streaming: bool,
161    pub ended_at_ms: u64,
162}
163
164impl FinishedRequest {
165    pub fn cache_input_accounting(&self) -> CacheInputAccounting {
166        CacheInputAccounting::for_service(&self.service)
167    }
168
169    pub fn cache_hit_rate(&self) -> Option<f64> {
170        self.usage
171            .as_ref()
172            .and_then(|usage| usage.cache_hit_rate_with_accounting(self.cache_input_accounting()))
173    }
174
175    pub fn observability_view(&self) -> RequestObservability {
176        RequestObservability::from_finished_request(self)
177    }
178
179    pub fn refresh_observability(&mut self) {
180        self.observability = self.observability_view();
181    }
182
183    pub fn generation_ms(&self) -> Option<u64> {
184        self.observability_view().generation_ms
185    }
186
187    pub fn output_tokens_per_second(&self) -> Option<f64> {
188        self.observability_view().output_tokens_per_second
189    }
190
191    pub fn attempt_count(&self) -> u32 {
192        self.observability_view().attempt_count
193    }
194
195    pub fn is_fast_mode(&self) -> bool {
196        self.observability_view().fast_mode
197    }
198
199    pub fn crossed_station_boundary(&self) -> bool {
200        self.observability_view().cross_station_failover
201    }
202}
203
204impl RequestObservability {
205    pub fn from_finished_request(request: &FinishedRequest) -> Self {
206        let retry = request.retry.as_ref();
207        let attempt_count = retry.map(|retry| retry.attempts.max(1)).unwrap_or(1);
208        let route_attempts = retry
209            .map(|retry| retry.route_attempts_or_derived())
210            .unwrap_or_default();
211        let route_attempt_count = route_attempts.len();
212        let final_station = request
213            .station_name
214            .as_deref()
215            .map(str::trim)
216            .filter(|station| !station.is_empty());
217        let has_station_context = final_station.is_some()
218            && route_attempts
219                .iter()
220                .any(|attempt| attempt.station_name.as_deref().is_some());
221        let cross_station_failover = final_station.is_some_and(|final_station| {
222            route_attempts
223                .iter()
224                .filter_map(|attempt| attempt.station_name.as_deref())
225                .any(|station| station != final_station)
226        });
227        let retried = attempt_count > 1;
228        let same_station_retry = retried && has_station_context && !cross_station_failover;
229        let generation_ms = generation_ms_from_duration(request.duration_ms, request.ttfb_ms);
230        let output_tokens_per_second = request.usage.as_ref().and_then(|usage| {
231            if usage.output_tokens == 0 {
232                return None;
233            }
234            let generation_ms = generation_ms?;
235            if generation_ms == 0 {
236                return None;
237            }
238            let rate = (usage.output_tokens as f64) / (generation_ms as f64 / 1000.0);
239            rate.is_finite().then_some(rate).filter(|rate| *rate > 0.0)
240        });
241        let decided_fast = request
242            .route_decision
243            .as_ref()
244            .and_then(|decision| decision.effective_service_tier.as_ref())
245            .is_some_and(|value| service_tier_is_fast(Some(value.value.as_str())));
246
247        Self {
248            trace_id: request
249                .trace_id
250                .clone()
251                .or_else(|| request.observability.trace_id.clone()),
252            duration_ms: Some(request.duration_ms),
253            ttfb_ms: request.ttfb_ms.filter(|value| *value > 0),
254            generation_ms,
255            output_tokens_per_second,
256            attempt_count,
257            route_attempt_count,
258            retried,
259            cross_station_failover,
260            same_station_retry,
261            fast_mode: decided_fast || service_tier_is_fast(request.service_tier.as_deref()),
262            streaming: request.streaming || request.observability.streaming,
263        }
264    }
265}
266
267#[derive(Debug, Clone)]
268pub struct FinishRequestParams {
269    pub id: u64,
270    pub status_code: u16,
271    pub duration_ms: u64,
272    pub ended_at_ms: u64,
273    pub observed_service_tier: Option<String>,
274    pub usage: Option<UsageMetrics>,
275    pub retry: Option<crate::logging::RetryInfo>,
276    pub ttfb_ms: Option<u64>,
277    pub streaming: bool,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
281pub struct SessionStats {
282    pub turns_total: u64,
283    #[serde(skip_serializing_if = "Option::is_none")]
284    pub last_client_name: Option<String>,
285    #[serde(skip_serializing_if = "Option::is_none")]
286    pub last_client_addr: Option<String>,
287    #[serde(skip_serializing_if = "Option::is_none")]
288    pub last_model: Option<String>,
289    #[serde(skip_serializing_if = "Option::is_none")]
290    pub last_reasoning_effort: Option<String>,
291    #[serde(skip_serializing_if = "Option::is_none")]
292    pub last_service_tier: Option<String>,
293    #[serde(skip_serializing_if = "Option::is_none")]
294    pub last_provider_id: Option<String>,
295    #[serde(skip_serializing_if = "Option::is_none")]
296    pub last_station_name: Option<String>,
297    #[serde(skip_serializing_if = "Option::is_none")]
298    pub last_route_decision: Option<RouteDecisionProvenance>,
299    #[serde(skip_serializing_if = "Option::is_none")]
300    pub last_usage: Option<UsageMetrics>,
301    pub total_usage: UsageMetrics,
302    pub turns_with_usage: u64,
303    #[serde(skip_serializing_if = "Option::is_none")]
304    pub last_status: Option<u16>,
305    #[serde(skip_serializing_if = "Option::is_none")]
306    pub last_duration_ms: Option<u64>,
307    #[serde(skip_serializing_if = "Option::is_none")]
308    pub last_ended_at_ms: Option<u64>,
309    pub last_seen_ms: u64,
310}
311
312#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
313#[serde(rename_all = "snake_case")]
314pub enum SessionContinuityMode {
315    #[default]
316    DefaultProfile,
317    ManualProfile,
318}
319
320#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
321#[serde(rename_all = "snake_case")]
322pub enum SessionObservationScope {
323    #[default]
324    ObservedOnly,
325    HostLocalEnriched,
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
329pub struct SessionBinding {
330    pub session_id: String,
331    #[serde(skip_serializing_if = "Option::is_none")]
332    pub profile_name: Option<String>,
333    #[serde(skip_serializing_if = "Option::is_none")]
334    pub station_name: Option<String>,
335    #[serde(skip_serializing_if = "Option::is_none")]
336    pub model: Option<String>,
337    #[serde(skip_serializing_if = "Option::is_none")]
338    pub reasoning_effort: Option<String>,
339    #[serde(skip_serializing_if = "Option::is_none")]
340    pub service_tier: Option<String>,
341    pub continuity_mode: SessionContinuityMode,
342    pub created_at_ms: u64,
343    pub updated_at_ms: u64,
344    pub last_seen_ms: u64,
345}
346
347#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
348#[serde(rename_all = "snake_case")]
349pub enum RouteValueSource {
350    RequestPayload,
351    SessionOverride,
352    GlobalOverride,
353    ProfileDefault,
354    StationMapping,
355    RuntimeFallback,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
359pub struct ResolvedRouteValue {
360    pub value: String,
361    pub source: RouteValueSource,
362}
363
364impl ResolvedRouteValue {
365    pub(crate) fn new(value: impl Into<String>, source: RouteValueSource) -> Self {
366        Self {
367            value: value.into(),
368            source,
369        }
370    }
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
374pub struct RouteDecisionProvenance {
375    pub decided_at_ms: u64,
376    #[serde(skip_serializing_if = "Option::is_none")]
377    pub binding_profile_name: Option<String>,
378    #[serde(skip_serializing_if = "Option::is_none")]
379    pub binding_continuity_mode: Option<SessionContinuityMode>,
380    #[serde(skip_serializing_if = "Option::is_none")]
381    pub effective_model: Option<ResolvedRouteValue>,
382    #[serde(skip_serializing_if = "Option::is_none")]
383    pub effective_reasoning_effort: Option<ResolvedRouteValue>,
384    #[serde(skip_serializing_if = "Option::is_none")]
385    pub effective_service_tier: Option<ResolvedRouteValue>,
386    #[serde(skip_serializing_if = "Option::is_none")]
387    pub effective_station: Option<ResolvedRouteValue>,
388    #[serde(skip_serializing_if = "Option::is_none")]
389    pub effective_upstream_base_url: Option<ResolvedRouteValue>,
390    #[serde(skip_serializing_if = "Option::is_none")]
391    pub provider_id: Option<String>,
392    #[serde(skip_serializing_if = "Option::is_none")]
393    pub endpoint_id: Option<String>,
394    #[serde(default, skip_serializing_if = "Vec::is_empty")]
395    pub route_path: Vec<String>,
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
399pub struct SessionRouteAffinity {
400    pub route_graph_key: String,
401    pub provider_endpoint: ProviderEndpointKey,
402    pub upstream_base_url: String,
403    #[serde(default, skip_serializing_if = "Vec::is_empty")]
404    pub route_path: Vec<String>,
405    pub last_selected_at_ms: u64,
406    pub last_changed_at_ms: u64,
407    pub change_reason: String,
408}
409
410#[derive(Debug, Clone, PartialEq, Eq)]
411pub struct SessionRouteAffinityTarget {
412    pub route_graph_key: String,
413    pub provider_endpoint: ProviderEndpointKey,
414    pub upstream_base_url: String,
415    pub route_path: Vec<String>,
416}
417
418impl SessionRouteAffinityTarget {
419    pub(crate) fn same_target(&self, affinity: &SessionRouteAffinity) -> bool {
420        self.route_graph_key == affinity.route_graph_key
421            && self.provider_endpoint == affinity.provider_endpoint
422            && self.upstream_base_url == affinity.upstream_base_url
423            && self.route_path == affinity.route_path
424    }
425}
426
427#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
428pub struct SessionIdentityCard {
429    pub session_id: Option<String>,
430    #[serde(default)]
431    pub observation_scope: SessionObservationScope,
432    #[serde(skip_serializing_if = "Option::is_none")]
433    pub host_local_transcript_path: Option<String>,
434    #[serde(skip_serializing_if = "Option::is_none")]
435    pub last_client_name: Option<String>,
436    #[serde(skip_serializing_if = "Option::is_none")]
437    pub last_client_addr: Option<String>,
438    #[serde(skip_serializing_if = "Option::is_none")]
439    pub cwd: Option<String>,
440    pub active_count: u64,
441    #[serde(skip_serializing_if = "Option::is_none")]
442    pub active_started_at_ms_min: Option<u64>,
443    #[serde(skip_serializing_if = "Option::is_none")]
444    pub last_status: Option<u16>,
445    #[serde(skip_serializing_if = "Option::is_none")]
446    pub last_duration_ms: Option<u64>,
447    #[serde(skip_serializing_if = "Option::is_none")]
448    pub last_ended_at_ms: Option<u64>,
449    #[serde(skip_serializing_if = "Option::is_none")]
450    pub last_model: Option<String>,
451    #[serde(skip_serializing_if = "Option::is_none")]
452    pub last_reasoning_effort: Option<String>,
453    #[serde(skip_serializing_if = "Option::is_none")]
454    pub last_service_tier: Option<String>,
455    #[serde(skip_serializing_if = "Option::is_none")]
456    pub last_provider_id: Option<String>,
457    #[serde(skip_serializing_if = "Option::is_none")]
458    pub last_station_name: Option<String>,
459    #[serde(skip_serializing_if = "Option::is_none")]
460    pub last_upstream_base_url: Option<String>,
461    #[serde(skip_serializing_if = "Option::is_none")]
462    pub last_usage: Option<UsageMetrics>,
463    #[serde(skip_serializing_if = "Option::is_none")]
464    pub total_usage: Option<UsageMetrics>,
465    #[serde(skip_serializing_if = "Option::is_none")]
466    pub turns_total: Option<u64>,
467    #[serde(skip_serializing_if = "Option::is_none")]
468    pub turns_with_usage: Option<u64>,
469    #[serde(skip_serializing_if = "Option::is_none")]
470    pub binding_profile_name: Option<String>,
471    #[serde(skip_serializing_if = "Option::is_none")]
472    pub binding_continuity_mode: Option<SessionContinuityMode>,
473    #[serde(skip_serializing_if = "Option::is_none")]
474    pub last_route_decision: Option<RouteDecisionProvenance>,
475    #[serde(skip_serializing_if = "Option::is_none")]
476    pub route_affinity: Option<SessionRouteAffinity>,
477    #[serde(skip_serializing_if = "Option::is_none")]
478    pub effective_model: Option<ResolvedRouteValue>,
479    #[serde(skip_serializing_if = "Option::is_none")]
480    pub effective_reasoning_effort: Option<ResolvedRouteValue>,
481    #[serde(skip_serializing_if = "Option::is_none")]
482    pub effective_service_tier: Option<ResolvedRouteValue>,
483    #[serde(skip_serializing_if = "Option::is_none")]
484    pub effective_station: Option<ResolvedRouteValue>,
485    #[serde(skip_serializing_if = "Option::is_none")]
486    pub effective_upstream_base_url: Option<ResolvedRouteValue>,
487    #[serde(skip_serializing_if = "Option::is_none")]
488    pub override_effort: Option<String>,
489    #[serde(skip_serializing_if = "Option::is_none")]
490    pub override_station_name: Option<String>,
491    #[serde(skip_serializing_if = "Option::is_none")]
492    pub override_model: Option<String>,
493    #[serde(skip_serializing_if = "Option::is_none")]
494    pub override_service_tier: Option<String>,
495}
496
497#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
498pub struct SessionManualOverrides {
499    #[serde(skip_serializing_if = "Option::is_none")]
500    pub reasoning_effort: Option<String>,
501    #[serde(skip_serializing_if = "Option::is_none")]
502    pub station_name: Option<String>,
503    #[serde(skip_serializing_if = "Option::is_none")]
504    pub route_target: Option<String>,
505    #[serde(skip_serializing_if = "Option::is_none")]
506    pub model: Option<String>,
507    #[serde(skip_serializing_if = "Option::is_none")]
508    pub service_tier: Option<String>,
509}
510
511impl SessionManualOverrides {
512    pub fn is_empty(&self) -> bool {
513        self.reasoning_effort.is_none()
514            && self.station_name.is_none()
515            && self.route_target.is_none()
516            && self.model.is_none()
517            && self.service_tier.is_none()
518    }
519}
520
521#[derive(Debug, Clone)]
522pub(super) struct SessionEffortOverride {
523    pub(super) effort: String,
524    #[allow(dead_code)]
525    pub(super) updated_at_ms: u64,
526    pub(super) last_seen_ms: u64,
527}
528
529#[derive(Debug, Clone)]
530pub(super) struct SessionStationOverride {
531    pub(super) station_name: String,
532    #[allow(dead_code)]
533    pub(super) updated_at_ms: u64,
534    pub(super) last_seen_ms: u64,
535}
536
537#[derive(Debug, Clone)]
538pub(super) struct SessionRouteTargetOverride {
539    pub(super) target: String,
540    #[allow(dead_code)]
541    pub(super) updated_at_ms: u64,
542    pub(super) last_seen_ms: u64,
543}
544
545#[derive(Debug, Clone)]
546pub(super) struct SessionModelOverride {
547    pub(super) model: String,
548    #[allow(dead_code)]
549    pub(super) updated_at_ms: u64,
550    pub(super) last_seen_ms: u64,
551}
552
553#[derive(Debug, Clone)]
554pub(super) struct SessionServiceTierOverride {
555    pub(super) service_tier: String,
556    #[allow(dead_code)]
557    pub(super) updated_at_ms: u64,
558    pub(super) last_seen_ms: u64,
559}
560
561#[derive(Debug, Clone)]
562pub(super) struct SessionBindingEntry {
563    pub(super) binding: SessionBinding,
564}
565
566#[derive(Debug, Clone)]
567pub(super) struct SessionCwdCacheEntry {
568    pub(super) cwd: Option<String>,
569    pub(super) last_seen_ms: u64,
570}
571
572fn empty_session_identity_card(session_id: Option<String>) -> SessionIdentityCard {
573    SessionIdentityCard {
574        session_id,
575        observation_scope: SessionObservationScope::ObservedOnly,
576        host_local_transcript_path: None,
577        last_client_name: None,
578        last_client_addr: None,
579        cwd: None,
580        active_count: 0,
581        active_started_at_ms_min: None,
582        last_status: None,
583        last_duration_ms: None,
584        last_ended_at_ms: None,
585        last_model: None,
586        last_reasoning_effort: None,
587        last_service_tier: None,
588        last_provider_id: None,
589        last_station_name: None,
590        last_upstream_base_url: None,
591        last_usage: None,
592        total_usage: None,
593        turns_total: None,
594        turns_with_usage: None,
595        binding_profile_name: None,
596        binding_continuity_mode: None,
597        last_route_decision: None,
598        route_affinity: None,
599        effective_model: None,
600        effective_reasoning_effort: None,
601        effective_service_tier: None,
602        effective_station: None,
603        effective_upstream_base_url: None,
604        override_effort: None,
605        override_station_name: None,
606        override_model: None,
607        override_service_tier: None,
608    }
609}
610
611fn non_empty_trimmed(value: Option<&str>) -> Option<String> {
612    value
613        .map(str::trim)
614        .filter(|value| !value.is_empty())
615        .map(ToOwned::to_owned)
616}
617
618fn resolve_effective_observed_value(
619    override_value: Option<&str>,
620    observed_value: Option<&str>,
621    binding_value: Option<&str>,
622) -> Option<ResolvedRouteValue> {
623    if let Some(value) = non_empty_trimmed(override_value) {
624        return Some(ResolvedRouteValue::new(
625            value,
626            RouteValueSource::SessionOverride,
627        ));
628    }
629    let binding_value = non_empty_trimmed(binding_value);
630    if let Some(binding) = binding_value {
631        return Some(ResolvedRouteValue::new(
632            binding,
633            RouteValueSource::ProfileDefault,
634        ));
635    }
636    non_empty_trimmed(observed_value)
637        .map(|observed| ResolvedRouteValue::new(observed, RouteValueSource::RequestPayload))
638}
639
640fn classify_session_observation_scope(card: &SessionIdentityCard) -> SessionObservationScope {
641    if card.cwd.is_some() {
642        SessionObservationScope::HostLocalEnriched
643    } else {
644        SessionObservationScope::ObservedOnly
645    }
646}
647
648fn resolve_effective_station_value(
649    card: &SessionIdentityCard,
650    global_station_override: Option<&str>,
651    binding_station_name: Option<&str>,
652) -> Option<ResolvedRouteValue> {
653    if let Some(value) = non_empty_trimmed(card.override_station_name.as_deref()) {
654        return Some(ResolvedRouteValue::new(
655            value,
656            RouteValueSource::SessionOverride,
657        ));
658    }
659    if let Some(value) = non_empty_trimmed(global_station_override) {
660        return Some(ResolvedRouteValue::new(
661            value,
662            RouteValueSource::GlobalOverride,
663        ));
664    }
665    let binding = non_empty_trimmed(binding_station_name);
666    if let Some(binding) = binding {
667        return Some(ResolvedRouteValue::new(
668            binding,
669            RouteValueSource::ProfileDefault,
670        ));
671    }
672    non_empty_trimmed(card.last_station_name.as_deref())
673        .map(|observed| ResolvedRouteValue::new(observed, RouteValueSource::RuntimeFallback))
674}
675
676fn apply_basic_effective_route(
677    card: &mut SessionIdentityCard,
678    global_station_override: Option<&str>,
679    binding: Option<&SessionBinding>,
680) {
681    card.effective_model = resolve_effective_observed_value(
682        card.override_model.as_deref(),
683        card.last_model.as_deref(),
684        binding.and_then(|binding| binding.model.as_deref()),
685    );
686    card.effective_reasoning_effort = resolve_effective_observed_value(
687        card.override_effort.as_deref(),
688        card.last_reasoning_effort.as_deref(),
689        binding.and_then(|binding| binding.reasoning_effort.as_deref()),
690    );
691    card.effective_service_tier = resolve_effective_observed_value(
692        card.override_service_tier.as_deref(),
693        card.last_service_tier.as_deref(),
694        binding.and_then(|binding| binding.service_tier.as_deref()),
695    );
696    card.binding_profile_name = binding.and_then(|binding| binding.profile_name.clone());
697    card.binding_continuity_mode = binding.map(|binding| binding.continuity_mode);
698    card.effective_station = resolve_effective_station_value(
699        card,
700        global_station_override,
701        binding.and_then(|binding| binding.station_name.as_deref()),
702    );
703    card.effective_upstream_base_url = match (
704        card.effective_station.as_ref(),
705        non_empty_trimmed(card.last_station_name.as_deref()),
706        non_empty_trimmed(card.last_upstream_base_url.as_deref()),
707    ) {
708        (Some(station), Some(last_station), Some(upstream)) if station.value == last_station => {
709            Some(ResolvedRouteValue::new(
710                upstream,
711                RouteValueSource::RuntimeFallback,
712            ))
713        }
714        _ => None,
715    };
716}
717
718pub fn enrich_session_identity_cards_with_runtime(
719    cards: &mut [SessionIdentityCard],
720    mgr: &ServiceConfigManager,
721) {
722    for card in cards {
723        if card.effective_station.is_none()
724            && let Some(active) = mgr.active_station()
725        {
726            card.effective_station = Some(ResolvedRouteValue::new(
727                active.name.clone(),
728                RouteValueSource::RuntimeFallback,
729            ));
730        }
731
732        let effective_station_name = card
733            .effective_station
734            .as_ref()
735            .map(|value| value.value.as_str());
736        if card.effective_upstream_base_url.is_none()
737            && let Some(station_name) = effective_station_name
738            && let Some(station) = mgr.station(station_name)
739            && station.upstreams.len() == 1
740        {
741            card.effective_upstream_base_url = Some(ResolvedRouteValue::new(
742                station.upstreams[0].base_url.clone(),
743                RouteValueSource::RuntimeFallback,
744            ));
745        }
746
747        let Some(model) = card
748            .effective_model
749            .as_ref()
750            .map(|value| value.value.clone())
751        else {
752            continue;
753        };
754        let Some(station_name) = effective_station_name else {
755            continue;
756        };
757        let Some(last_station_name) = card.last_station_name.as_deref() else {
758            continue;
759        };
760        if last_station_name != station_name {
761            continue;
762        }
763        let Some(last_upstream_base_url) = card.last_upstream_base_url.as_deref() else {
764            continue;
765        };
766        let Some(station) = mgr.station(station_name) else {
767            continue;
768        };
769        let Some(upstream) = station
770            .upstreams
771            .iter()
772            .find(|upstream| upstream.base_url == last_upstream_base_url)
773        else {
774            continue;
775        };
776
777        let mapped = crate::model_routing::effective_model(&upstream.model_mapping, model.as_str());
778        if mapped != model {
779            card.effective_model = Some(ResolvedRouteValue::new(
780                mapped,
781                RouteValueSource::StationMapping,
782            ));
783        }
784    }
785}
786
787fn session_identity_sort_key(card: &SessionIdentityCard) -> u64 {
788    card.last_ended_at_ms
789        .unwrap_or(0)
790        .max(card.active_started_at_ms_min.unwrap_or(0))
791}
792
793fn route_decision_at_ms(route_decision: Option<&RouteDecisionProvenance>) -> u64 {
794    route_decision
795        .map(|decision| decision.decided_at_ms)
796        .unwrap_or(0)
797}
798
799fn update_card_route_decision(
800    card: &mut SessionIdentityCard,
801    route_decision: Option<&RouteDecisionProvenance>,
802) {
803    let Some(route_decision) = route_decision.cloned() else {
804        return;
805    };
806    let current_at = route_decision_at_ms(card.last_route_decision.as_ref());
807    if current_at <= route_decision.decided_at_ms {
808        card.last_route_decision = Some(route_decision);
809    }
810}
811
812pub struct SessionIdentityCardBuildInputs<'a> {
813    pub active: &'a [ActiveRequest],
814    pub recent: &'a [FinishedRequest],
815    pub overrides: &'a HashMap<String, String>,
816    pub station_overrides: &'a HashMap<String, String>,
817    pub model_overrides: &'a HashMap<String, String>,
818    pub service_tier_overrides: &'a HashMap<String, String>,
819    pub bindings: &'a HashMap<String, SessionBinding>,
820    pub route_affinities: &'a HashMap<String, SessionRouteAffinity>,
821    pub global_station_override: Option<&'a str>,
822    pub stats: &'a HashMap<String, SessionStats>,
823}
824
825pub fn build_session_identity_cards_from_parts(
826    inputs: SessionIdentityCardBuildInputs<'_>,
827) -> Vec<SessionIdentityCard> {
828    let SessionIdentityCardBuildInputs {
829        active,
830        recent,
831        overrides,
832        station_overrides,
833        model_overrides,
834        service_tier_overrides,
835        bindings,
836        route_affinities,
837        global_station_override,
838        stats,
839    } = inputs;
840
841    use std::collections::HashMap as StdHashMap;
842
843    let mut map: StdHashMap<Option<String>, SessionIdentityCard> = StdHashMap::new();
844
845    for req in active {
846        let key = req.session_id.clone();
847        let entry = map
848            .entry(key.clone())
849            .or_insert_with(|| empty_session_identity_card(key));
850
851        entry.active_count = entry.active_count.saturating_add(1);
852        entry.active_started_at_ms_min = Some(
853            entry
854                .active_started_at_ms_min
855                .unwrap_or(req.started_at_ms)
856                .min(req.started_at_ms),
857        );
858        if entry.cwd.is_none() {
859            entry.cwd = req.cwd.clone();
860        }
861        if entry.last_client_name.is_none() {
862            entry.last_client_name = req.client_name.clone();
863        }
864        if entry.last_client_addr.is_none() {
865            entry.last_client_addr = req.client_addr.clone();
866        }
867        if let Some(effort) = req.reasoning_effort.as_ref() {
868            entry.last_reasoning_effort = Some(effort.clone());
869        }
870        if let Some(service_tier) = req.service_tier.as_ref() {
871            entry.last_service_tier = Some(service_tier.clone());
872        }
873        if entry.last_model.is_none() {
874            entry.last_model = req.model.clone();
875        }
876        if entry.last_provider_id.is_none() {
877            entry.last_provider_id = req.provider_id.clone();
878        }
879        if entry.last_station_name.is_none() {
880            entry.last_station_name = req.station_name.clone();
881        }
882        if entry.last_upstream_base_url.is_none() {
883            entry.last_upstream_base_url = req.upstream_base_url.clone();
884        }
885        update_card_route_decision(entry, req.route_decision.as_ref());
886    }
887
888    for r in recent {
889        let key = r.session_id.clone();
890        let entry = map
891            .entry(key.clone())
892            .or_insert_with(|| empty_session_identity_card(key));
893
894        let should_update = entry
895            .last_ended_at_ms
896            .is_none_or(|prev| r.ended_at_ms >= prev);
897        if should_update {
898            entry.last_status = Some(r.status_code);
899            entry.last_duration_ms = Some(r.duration_ms);
900            entry.last_ended_at_ms = Some(r.ended_at_ms);
901            entry.last_client_name = r.client_name.clone().or(entry.last_client_name.clone());
902            entry.last_client_addr = r.client_addr.clone().or(entry.last_client_addr.clone());
903            entry.last_model = r.model.clone().or(entry.last_model.clone());
904            entry.last_reasoning_effort = r
905                .reasoning_effort
906                .clone()
907                .or(entry.last_reasoning_effort.clone());
908            entry.last_service_tier = r.service_tier.clone().or(entry.last_service_tier.clone());
909            entry.last_provider_id = r.provider_id.clone().or(entry.last_provider_id.clone());
910            entry.last_station_name = r.station_name.clone().or(entry.last_station_name.clone());
911            entry.last_upstream_base_url = r
912                .upstream_base_url
913                .clone()
914                .or(entry.last_upstream_base_url.clone());
915            entry.last_usage = r.usage.clone().or(entry.last_usage.clone());
916        }
917        if entry.cwd.is_none() {
918            entry.cwd = r.cwd.clone();
919        }
920        update_card_route_decision(entry, r.route_decision.as_ref());
921    }
922
923    for (sid, st) in stats {
924        let key = Some(sid.clone());
925        let entry = map
926            .entry(key.clone())
927            .or_insert_with(|| empty_session_identity_card(key));
928
929        if entry.turns_total.is_none() {
930            entry.turns_total = Some(st.turns_total);
931        }
932        if entry.last_client_name.is_none() {
933            entry.last_client_name = st.last_client_name.clone();
934        }
935        if entry.last_client_addr.is_none() {
936            entry.last_client_addr = st.last_client_addr.clone();
937        }
938        if entry.last_status.is_none() {
939            entry.last_status = st.last_status;
940        }
941        if entry.last_duration_ms.is_none() {
942            entry.last_duration_ms = st.last_duration_ms;
943        }
944        if entry.last_ended_at_ms.is_none() {
945            entry.last_ended_at_ms = st.last_ended_at_ms;
946        }
947        if entry.last_model.is_none() {
948            entry.last_model = st.last_model.clone();
949        }
950        if entry.last_reasoning_effort.is_none() {
951            entry.last_reasoning_effort = st.last_reasoning_effort.clone();
952        }
953        if entry.last_service_tier.is_none() {
954            entry.last_service_tier = st.last_service_tier.clone();
955        }
956        if entry.last_provider_id.is_none() {
957            entry.last_provider_id = st.last_provider_id.clone();
958        }
959        if entry.last_station_name.is_none() {
960            entry.last_station_name = st.last_station_name.clone();
961        }
962        if entry.last_usage.is_none() {
963            entry.last_usage = st.last_usage.clone();
964        }
965        if entry.total_usage.is_none() {
966            entry.total_usage = Some(st.total_usage.clone());
967        }
968        if entry.turns_with_usage.is_none() {
969            entry.turns_with_usage = Some(st.turns_with_usage);
970        }
971        update_card_route_decision(entry, st.last_route_decision.as_ref());
972    }
973
974    for (sid, eff) in overrides {
975        let key = Some(sid.clone());
976        let entry = map
977            .entry(key.clone())
978            .or_insert_with(|| empty_session_identity_card(key));
979        entry.override_effort = Some(eff.clone());
980    }
981
982    for (sid, station_name) in station_overrides {
983        let key = Some(sid.clone());
984        let entry = map
985            .entry(key.clone())
986            .or_insert_with(|| empty_session_identity_card(key));
987        entry.override_station_name = Some(station_name.clone());
988    }
989
990    for (sid, model) in model_overrides {
991        let key = Some(sid.clone());
992        let entry = map
993            .entry(key.clone())
994            .or_insert_with(|| empty_session_identity_card(key));
995        entry.override_model = Some(model.clone());
996    }
997
998    for (sid, service_tier) in service_tier_overrides {
999        let key = Some(sid.clone());
1000        let entry = map
1001            .entry(key.clone())
1002            .or_insert_with(|| empty_session_identity_card(key));
1003        entry.override_service_tier = Some(service_tier.clone());
1004    }
1005
1006    for (sid, affinity) in route_affinities {
1007        let key = Some(sid.clone());
1008        let entry = map
1009            .entry(key.clone())
1010            .or_insert_with(|| empty_session_identity_card(key));
1011        entry.route_affinity = Some(affinity.clone());
1012    }
1013
1014    let mut cards = map.into_values().collect::<Vec<_>>();
1015    for card in &mut cards {
1016        let binding = card
1017            .session_id
1018            .as_deref()
1019            .and_then(|session_id| bindings.get(session_id));
1020        apply_basic_effective_route(card, global_station_override, binding);
1021        card.observation_scope = classify_session_observation_scope(card);
1022    }
1023    cards.sort_by_key(|card| std::cmp::Reverse(session_identity_sort_key(card)));
1024    cards
1025}
1026
1027pub async fn enrich_session_identity_cards_with_host_transcripts(
1028    cards: &mut [SessionIdentityCard],
1029) {
1030    let session_ids = cards
1031        .iter()
1032        .filter_map(|card| card.session_id.as_deref())
1033        .map(str::to_owned)
1034        .collect::<Vec<_>>();
1035    if session_ids.is_empty() {
1036        return;
1037    }
1038
1039    let found = match sessions::find_codex_session_files_by_ids(&session_ids).await {
1040        Ok(found) => found,
1041        Err(_) => return,
1042    };
1043
1044    for card in cards {
1045        card.host_local_transcript_path = card
1046            .session_id
1047            .as_deref()
1048            .and_then(|sid| found.get(sid))
1049            .map(|path| path.to_string_lossy().to_string());
1050    }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055    use super::*;
1056
1057    use crate::logging::RetryInfo;
1058    use crate::pricing::CostBreakdown;
1059    use crate::usage::UsageMetrics;
1060
1061    fn sample_finished_request() -> FinishedRequest {
1062        FinishedRequest {
1063            id: 7,
1064            trace_id: Some("codex-7".to_string()),
1065            session_id: Some("sid".to_string()),
1066            client_name: None,
1067            client_addr: None,
1068            cwd: None,
1069            model: Some("gpt-5".to_string()),
1070            reasoning_effort: None,
1071            service_tier: Some("default".to_string()),
1072            station_name: Some("primary".to_string()),
1073            provider_id: Some("primary-provider".to_string()),
1074            upstream_base_url: Some("https://primary.example/v1".to_string()),
1075            route_decision: Some(RouteDecisionProvenance {
1076                effective_service_tier: Some(ResolvedRouteValue {
1077                    value: "priority".to_string(),
1078                    source: RouteValueSource::ProfileDefault,
1079                }),
1080                ..Default::default()
1081            }),
1082            usage: Some(UsageMetrics {
1083                output_tokens: 200,
1084                total_tokens: 200,
1085                ..UsageMetrics::default()
1086            }),
1087            cost: CostBreakdown::default(),
1088            retry: Some(RetryInfo {
1089                attempts: 2,
1090                upstream_chain: vec![
1091                    "backup:https://backup.example/v1 (idx=0) transport_error=timeout".to_string(),
1092                    "primary:https://primary.example/v1 (idx=0) status=200 class=-".to_string(),
1093                ],
1094                route_attempts: Vec::new(),
1095            }),
1096            observability: RequestObservability::default(),
1097            service: "codex".to_string(),
1098            method: "POST".to_string(),
1099            path: "/v1/responses".to_string(),
1100            status_code: 200,
1101            duration_ms: 1_500,
1102            ttfb_ms: Some(500),
1103            streaming: true,
1104            ended_at_ms: 2_000,
1105        }
1106    }
1107
1108    #[test]
1109    fn finished_request_observability_derives_canonical_request_facts() {
1110        let request = sample_finished_request();
1111
1112        let observability = request.observability_view();
1113
1114        assert_eq!(observability.trace_id.as_deref(), Some("codex-7"));
1115        assert_eq!(observability.duration_ms, Some(1_500));
1116        assert_eq!(observability.ttfb_ms, Some(500));
1117        assert_eq!(observability.generation_ms, Some(1_000));
1118        assert_eq!(observability.attempt_count, 2);
1119        assert_eq!(observability.route_attempt_count, 2);
1120        assert!(observability.retried);
1121        assert!(observability.cross_station_failover);
1122        assert!(observability.fast_mode);
1123        assert!(observability.streaming);
1124        assert_eq!(observability.output_tokens_per_second, Some(200.0));
1125    }
1126
1127    #[test]
1128    fn finished_request_serializes_materialized_observability_for_operator_api() {
1129        let mut request = sample_finished_request();
1130        request.refresh_observability();
1131
1132        let value = serde_json::to_value(&request).expect("finished request json");
1133
1134        assert_eq!(value["observability"]["trace_id"].as_str(), Some("codex-7"));
1135        assert_eq!(
1136            value["observability"]["generation_ms"].as_u64(),
1137            Some(1_000)
1138        );
1139        assert_eq!(
1140            value["observability"]["output_tokens_per_second"].as_f64(),
1141            Some(200.0)
1142        );
1143        assert_eq!(value["observability"]["attempt_count"].as_u64(), Some(2));
1144        assert_eq!(value["observability"]["streaming"].as_bool(), Some(true));
1145    }
1146
1147    #[test]
1148    fn finished_request_legacy_payload_still_derives_observability() {
1149        let request: FinishedRequest = serde_json::from_value(serde_json::json!({
1150            "id": 8,
1151            "trace_id": "codex-8",
1152            "usage": {
1153                "output_tokens": 120,
1154                "total_tokens": 120
1155            },
1156            "cost": {},
1157            "service": "codex",
1158            "method": "POST",
1159            "path": "/v1/responses",
1160            "status_code": 200,
1161            "duration_ms": 1_500,
1162            "ttfb_ms": 300,
1163            "ended_at_ms": 2_500
1164        }))
1165        .expect("legacy finished request");
1166
1167        assert_eq!(request.observability.attempt_count, 1);
1168        assert!(!request.streaming);
1169
1170        let observability = request.observability_view();
1171        assert_eq!(observability.generation_ms, Some(1_200));
1172        assert_eq!(observability.output_tokens_per_second, Some(100.0));
1173        assert_eq!(observability.attempt_count, 1);
1174        assert!(!observability.fast_mode);
1175    }
1176}