1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::config::ServiceConfigManager;
6use crate::pricing::CostBreakdown;
7use crate::runtime_identity::ProviderEndpointKey;
8use crate::sessions;
9use crate::usage::{CacheInputAccounting, UsageMetrics};
10
11fn bool_is_false(value: &bool) -> bool {
12 !*value
13}
14
15fn service_tier_is_fast(value: Option<&str>) -> bool {
16 value
17 .map(str::trim)
18 .is_some_and(|tier| tier.eq_ignore_ascii_case("priority"))
19}
20
21fn generation_ms_from_duration(duration_ms: u64, ttfb_ms: Option<u64>) -> Option<u64> {
22 if duration_ms == 0 {
23 return None;
24 }
25 let ttfb_ms = ttfb_ms.unwrap_or(0);
26 if ttfb_ms > 0 && ttfb_ms < duration_ms {
27 Some(duration_ms.saturating_sub(ttfb_ms))
28 } else {
29 Some(duration_ms)
30 }
31}
32
33fn default_attempt_count() -> u32 {
34 1
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct RequestObservability {
39 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub trace_id: Option<String>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub duration_ms: Option<u64>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub ttfb_ms: Option<u64>,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub generation_ms: Option<u64>,
47 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub output_tokens_per_second: Option<f64>,
49 #[serde(default = "default_attempt_count")]
50 pub attempt_count: u32,
51 #[serde(default)]
52 pub route_attempt_count: usize,
53 #[serde(default, skip_serializing_if = "bool_is_false")]
54 pub retried: bool,
55 #[serde(default, skip_serializing_if = "bool_is_false")]
56 pub cross_station_failover: bool,
57 #[serde(default, skip_serializing_if = "bool_is_false")]
58 pub same_station_retry: bool,
59 #[serde(default, skip_serializing_if = "bool_is_false")]
60 pub fast_mode: bool,
61 #[serde(default, skip_serializing_if = "bool_is_false")]
62 pub streaming: bool,
63}
64
65impl Default for RequestObservability {
66 fn default() -> Self {
67 Self {
68 trace_id: None,
69 duration_ms: None,
70 ttfb_ms: None,
71 generation_ms: None,
72 output_tokens_per_second: None,
73 attempt_count: 1,
74 route_attempt_count: 0,
75 retried: false,
76 cross_station_failover: false,
77 same_station_retry: false,
78 fast_mode: false,
79 streaming: false,
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85pub struct ActiveRequest {
86 pub id: u64,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub trace_id: Option<String>,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub session_id: Option<String>,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub client_name: Option<String>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub client_addr: Option<String>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub cwd: Option<String>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub model: Option<String>,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub reasoning_effort: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 pub service_tier: Option<String>,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub station_name: Option<String>,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub provider_id: Option<String>,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub upstream_base_url: Option<String>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub route_decision: Option<RouteDecisionProvenance>,
111 pub service: String,
112 pub method: String,
113 pub path: String,
114 pub started_at_ms: u64,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct FinishedRequest {
119 pub id: u64,
120 #[serde(default, skip_serializing_if = "Option::is_none")]
121 pub trace_id: Option<String>,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub session_id: Option<String>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub client_name: Option<String>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub client_addr: Option<String>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 pub cwd: Option<String>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 pub model: Option<String>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 pub reasoning_effort: Option<String>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub service_tier: Option<String>,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub station_name: Option<String>,
138 #[serde(skip_serializing_if = "Option::is_none")]
139 pub provider_id: Option<String>,
140 #[serde(skip_serializing_if = "Option::is_none")]
141 pub upstream_base_url: Option<String>,
142 #[serde(skip_serializing_if = "Option::is_none")]
143 pub route_decision: Option<RouteDecisionProvenance>,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 pub usage: Option<UsageMetrics>,
146 #[serde(default, skip_serializing_if = "CostBreakdown::is_unknown")]
147 pub cost: CostBreakdown,
148 #[serde(skip_serializing_if = "Option::is_none")]
149 pub retry: Option<crate::logging::RetryInfo>,
150 #[serde(default)]
151 pub observability: RequestObservability,
152 pub service: String,
153 pub method: String,
154 pub path: String,
155 pub status_code: u16,
156 pub duration_ms: u64,
157 #[serde(skip_serializing_if = "Option::is_none")]
158 pub ttfb_ms: Option<u64>,
159 #[serde(default, skip_serializing_if = "bool_is_false")]
160 pub streaming: bool,
161 pub ended_at_ms: u64,
162}
163
164impl FinishedRequest {
165 pub fn cache_input_accounting(&self) -> CacheInputAccounting {
166 CacheInputAccounting::for_service(&self.service)
167 }
168
169 pub fn cache_hit_rate(&self) -> Option<f64> {
170 self.usage
171 .as_ref()
172 .and_then(|usage| usage.cache_hit_rate_with_accounting(self.cache_input_accounting()))
173 }
174
175 pub fn observability_view(&self) -> RequestObservability {
176 RequestObservability::from_finished_request(self)
177 }
178
179 pub fn refresh_observability(&mut self) {
180 self.observability = self.observability_view();
181 }
182
183 pub fn generation_ms(&self) -> Option<u64> {
184 self.observability_view().generation_ms
185 }
186
187 pub fn output_tokens_per_second(&self) -> Option<f64> {
188 self.observability_view().output_tokens_per_second
189 }
190
191 pub fn attempt_count(&self) -> u32 {
192 self.observability_view().attempt_count
193 }
194
195 pub fn is_fast_mode(&self) -> bool {
196 self.observability_view().fast_mode
197 }
198
199 pub fn crossed_station_boundary(&self) -> bool {
200 self.observability_view().cross_station_failover
201 }
202}
203
204impl RequestObservability {
205 pub fn from_finished_request(request: &FinishedRequest) -> Self {
206 let retry = request.retry.as_ref();
207 let attempt_count = retry.map(|retry| retry.attempts.max(1)).unwrap_or(1);
208 let route_attempts = retry
209 .map(|retry| retry.route_attempts_or_derived())
210 .unwrap_or_default();
211 let route_attempt_count = route_attempts.len();
212 let final_station = request
213 .station_name
214 .as_deref()
215 .map(str::trim)
216 .filter(|station| !station.is_empty());
217 let has_station_context = final_station.is_some()
218 && route_attempts
219 .iter()
220 .any(|attempt| attempt.station_name.as_deref().is_some());
221 let cross_station_failover = final_station.is_some_and(|final_station| {
222 route_attempts
223 .iter()
224 .filter_map(|attempt| attempt.station_name.as_deref())
225 .any(|station| station != final_station)
226 });
227 let retried = attempt_count > 1;
228 let same_station_retry = retried && has_station_context && !cross_station_failover;
229 let generation_ms = generation_ms_from_duration(request.duration_ms, request.ttfb_ms);
230 let output_tokens_per_second = request.usage.as_ref().and_then(|usage| {
231 if usage.output_tokens == 0 {
232 return None;
233 }
234 let generation_ms = generation_ms?;
235 if generation_ms == 0 {
236 return None;
237 }
238 let rate = (usage.output_tokens as f64) / (generation_ms as f64 / 1000.0);
239 rate.is_finite().then_some(rate).filter(|rate| *rate > 0.0)
240 });
241 let decided_fast = request
242 .route_decision
243 .as_ref()
244 .and_then(|decision| decision.effective_service_tier.as_ref())
245 .is_some_and(|value| service_tier_is_fast(Some(value.value.as_str())));
246
247 Self {
248 trace_id: request
249 .trace_id
250 .clone()
251 .or_else(|| request.observability.trace_id.clone()),
252 duration_ms: Some(request.duration_ms),
253 ttfb_ms: request.ttfb_ms.filter(|value| *value > 0),
254 generation_ms,
255 output_tokens_per_second,
256 attempt_count,
257 route_attempt_count,
258 retried,
259 cross_station_failover,
260 same_station_retry,
261 fast_mode: decided_fast || service_tier_is_fast(request.service_tier.as_deref()),
262 streaming: request.streaming || request.observability.streaming,
263 }
264 }
265}
266
267#[derive(Debug, Clone)]
268pub struct FinishRequestParams {
269 pub id: u64,
270 pub status_code: u16,
271 pub duration_ms: u64,
272 pub ended_at_ms: u64,
273 pub observed_service_tier: Option<String>,
274 pub usage: Option<UsageMetrics>,
275 pub retry: Option<crate::logging::RetryInfo>,
276 pub ttfb_ms: Option<u64>,
277 pub streaming: bool,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
281pub struct SessionStats {
282 pub turns_total: u64,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 pub last_client_name: Option<String>,
285 #[serde(skip_serializing_if = "Option::is_none")]
286 pub last_client_addr: Option<String>,
287 #[serde(skip_serializing_if = "Option::is_none")]
288 pub last_model: Option<String>,
289 #[serde(skip_serializing_if = "Option::is_none")]
290 pub last_reasoning_effort: Option<String>,
291 #[serde(skip_serializing_if = "Option::is_none")]
292 pub last_service_tier: Option<String>,
293 #[serde(skip_serializing_if = "Option::is_none")]
294 pub last_provider_id: Option<String>,
295 #[serde(skip_serializing_if = "Option::is_none")]
296 pub last_station_name: Option<String>,
297 #[serde(skip_serializing_if = "Option::is_none")]
298 pub last_route_decision: Option<RouteDecisionProvenance>,
299 #[serde(skip_serializing_if = "Option::is_none")]
300 pub last_usage: Option<UsageMetrics>,
301 pub total_usage: UsageMetrics,
302 pub turns_with_usage: u64,
303 #[serde(skip_serializing_if = "Option::is_none")]
304 pub last_status: Option<u16>,
305 #[serde(skip_serializing_if = "Option::is_none")]
306 pub last_duration_ms: Option<u64>,
307 #[serde(skip_serializing_if = "Option::is_none")]
308 pub last_ended_at_ms: Option<u64>,
309 pub last_seen_ms: u64,
310}
311
312#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
313#[serde(rename_all = "snake_case")]
314pub enum SessionContinuityMode {
315 #[default]
316 DefaultProfile,
317 ManualProfile,
318}
319
320#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
321#[serde(rename_all = "snake_case")]
322pub enum SessionObservationScope {
323 #[default]
324 ObservedOnly,
325 HostLocalEnriched,
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
329pub struct SessionBinding {
330 pub session_id: String,
331 #[serde(skip_serializing_if = "Option::is_none")]
332 pub profile_name: Option<String>,
333 #[serde(skip_serializing_if = "Option::is_none")]
334 pub station_name: Option<String>,
335 #[serde(skip_serializing_if = "Option::is_none")]
336 pub model: Option<String>,
337 #[serde(skip_serializing_if = "Option::is_none")]
338 pub reasoning_effort: Option<String>,
339 #[serde(skip_serializing_if = "Option::is_none")]
340 pub service_tier: Option<String>,
341 pub continuity_mode: SessionContinuityMode,
342 pub created_at_ms: u64,
343 pub updated_at_ms: u64,
344 pub last_seen_ms: u64,
345}
346
347#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
348#[serde(rename_all = "snake_case")]
349pub enum RouteValueSource {
350 RequestPayload,
351 SessionOverride,
352 GlobalOverride,
353 ProfileDefault,
354 StationMapping,
355 RuntimeFallback,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
359pub struct ResolvedRouteValue {
360 pub value: String,
361 pub source: RouteValueSource,
362}
363
364impl ResolvedRouteValue {
365 pub(crate) fn new(value: impl Into<String>, source: RouteValueSource) -> Self {
366 Self {
367 value: value.into(),
368 source,
369 }
370 }
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
374pub struct RouteDecisionProvenance {
375 pub decided_at_ms: u64,
376 #[serde(skip_serializing_if = "Option::is_none")]
377 pub binding_profile_name: Option<String>,
378 #[serde(skip_serializing_if = "Option::is_none")]
379 pub binding_continuity_mode: Option<SessionContinuityMode>,
380 #[serde(skip_serializing_if = "Option::is_none")]
381 pub effective_model: Option<ResolvedRouteValue>,
382 #[serde(skip_serializing_if = "Option::is_none")]
383 pub effective_reasoning_effort: Option<ResolvedRouteValue>,
384 #[serde(skip_serializing_if = "Option::is_none")]
385 pub effective_service_tier: Option<ResolvedRouteValue>,
386 #[serde(skip_serializing_if = "Option::is_none")]
387 pub effective_station: Option<ResolvedRouteValue>,
388 #[serde(skip_serializing_if = "Option::is_none")]
389 pub effective_upstream_base_url: Option<ResolvedRouteValue>,
390 #[serde(skip_serializing_if = "Option::is_none")]
391 pub provider_id: Option<String>,
392 #[serde(skip_serializing_if = "Option::is_none")]
393 pub endpoint_id: Option<String>,
394 #[serde(default, skip_serializing_if = "Vec::is_empty")]
395 pub route_path: Vec<String>,
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
399pub struct SessionRouteAffinity {
400 pub route_graph_key: String,
401 pub provider_endpoint: ProviderEndpointKey,
402 pub upstream_base_url: String,
403 #[serde(default, skip_serializing_if = "Vec::is_empty")]
404 pub route_path: Vec<String>,
405 pub last_selected_at_ms: u64,
406 pub last_changed_at_ms: u64,
407 pub change_reason: String,
408}
409
410#[derive(Debug, Clone, PartialEq, Eq)]
411pub struct SessionRouteAffinityTarget {
412 pub route_graph_key: String,
413 pub provider_endpoint: ProviderEndpointKey,
414 pub upstream_base_url: String,
415 pub route_path: Vec<String>,
416}
417
418impl SessionRouteAffinityTarget {
419 pub(crate) fn same_target(&self, affinity: &SessionRouteAffinity) -> bool {
420 self.route_graph_key == affinity.route_graph_key
421 && self.provider_endpoint == affinity.provider_endpoint
422 && self.upstream_base_url == affinity.upstream_base_url
423 && self.route_path == affinity.route_path
424 }
425}
426
427#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
428pub struct SessionIdentityCard {
429 pub session_id: Option<String>,
430 #[serde(default)]
431 pub observation_scope: SessionObservationScope,
432 #[serde(skip_serializing_if = "Option::is_none")]
433 pub host_local_transcript_path: Option<String>,
434 #[serde(skip_serializing_if = "Option::is_none")]
435 pub last_client_name: Option<String>,
436 #[serde(skip_serializing_if = "Option::is_none")]
437 pub last_client_addr: Option<String>,
438 #[serde(skip_serializing_if = "Option::is_none")]
439 pub cwd: Option<String>,
440 pub active_count: u64,
441 #[serde(skip_serializing_if = "Option::is_none")]
442 pub active_started_at_ms_min: Option<u64>,
443 #[serde(skip_serializing_if = "Option::is_none")]
444 pub last_status: Option<u16>,
445 #[serde(skip_serializing_if = "Option::is_none")]
446 pub last_duration_ms: Option<u64>,
447 #[serde(skip_serializing_if = "Option::is_none")]
448 pub last_ended_at_ms: Option<u64>,
449 #[serde(skip_serializing_if = "Option::is_none")]
450 pub last_model: Option<String>,
451 #[serde(skip_serializing_if = "Option::is_none")]
452 pub last_reasoning_effort: Option<String>,
453 #[serde(skip_serializing_if = "Option::is_none")]
454 pub last_service_tier: Option<String>,
455 #[serde(skip_serializing_if = "Option::is_none")]
456 pub last_provider_id: Option<String>,
457 #[serde(skip_serializing_if = "Option::is_none")]
458 pub last_station_name: Option<String>,
459 #[serde(skip_serializing_if = "Option::is_none")]
460 pub last_upstream_base_url: Option<String>,
461 #[serde(skip_serializing_if = "Option::is_none")]
462 pub last_usage: Option<UsageMetrics>,
463 #[serde(skip_serializing_if = "Option::is_none")]
464 pub total_usage: Option<UsageMetrics>,
465 #[serde(skip_serializing_if = "Option::is_none")]
466 pub turns_total: Option<u64>,
467 #[serde(skip_serializing_if = "Option::is_none")]
468 pub turns_with_usage: Option<u64>,
469 #[serde(skip_serializing_if = "Option::is_none")]
470 pub binding_profile_name: Option<String>,
471 #[serde(skip_serializing_if = "Option::is_none")]
472 pub binding_continuity_mode: Option<SessionContinuityMode>,
473 #[serde(skip_serializing_if = "Option::is_none")]
474 pub last_route_decision: Option<RouteDecisionProvenance>,
475 #[serde(skip_serializing_if = "Option::is_none")]
476 pub route_affinity: Option<SessionRouteAffinity>,
477 #[serde(skip_serializing_if = "Option::is_none")]
478 pub effective_model: Option<ResolvedRouteValue>,
479 #[serde(skip_serializing_if = "Option::is_none")]
480 pub effective_reasoning_effort: Option<ResolvedRouteValue>,
481 #[serde(skip_serializing_if = "Option::is_none")]
482 pub effective_service_tier: Option<ResolvedRouteValue>,
483 #[serde(skip_serializing_if = "Option::is_none")]
484 pub effective_station: Option<ResolvedRouteValue>,
485 #[serde(skip_serializing_if = "Option::is_none")]
486 pub effective_upstream_base_url: Option<ResolvedRouteValue>,
487 #[serde(skip_serializing_if = "Option::is_none")]
488 pub override_effort: Option<String>,
489 #[serde(skip_serializing_if = "Option::is_none")]
490 pub override_station_name: Option<String>,
491 #[serde(skip_serializing_if = "Option::is_none")]
492 pub override_model: Option<String>,
493 #[serde(skip_serializing_if = "Option::is_none")]
494 pub override_service_tier: Option<String>,
495}
496
497#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
498pub struct SessionManualOverrides {
499 #[serde(skip_serializing_if = "Option::is_none")]
500 pub reasoning_effort: Option<String>,
501 #[serde(skip_serializing_if = "Option::is_none")]
502 pub station_name: Option<String>,
503 #[serde(skip_serializing_if = "Option::is_none")]
504 pub route_target: Option<String>,
505 #[serde(skip_serializing_if = "Option::is_none")]
506 pub model: Option<String>,
507 #[serde(skip_serializing_if = "Option::is_none")]
508 pub service_tier: Option<String>,
509}
510
511impl SessionManualOverrides {
512 pub fn is_empty(&self) -> bool {
513 self.reasoning_effort.is_none()
514 && self.station_name.is_none()
515 && self.route_target.is_none()
516 && self.model.is_none()
517 && self.service_tier.is_none()
518 }
519}
520
521#[derive(Debug, Clone)]
522pub(super) struct SessionEffortOverride {
523 pub(super) effort: String,
524 #[allow(dead_code)]
525 pub(super) updated_at_ms: u64,
526 pub(super) last_seen_ms: u64,
527}
528
529#[derive(Debug, Clone)]
530pub(super) struct SessionStationOverride {
531 pub(super) station_name: String,
532 #[allow(dead_code)]
533 pub(super) updated_at_ms: u64,
534 pub(super) last_seen_ms: u64,
535}
536
537#[derive(Debug, Clone)]
538pub(super) struct SessionRouteTargetOverride {
539 pub(super) target: String,
540 #[allow(dead_code)]
541 pub(super) updated_at_ms: u64,
542 pub(super) last_seen_ms: u64,
543}
544
545#[derive(Debug, Clone)]
546pub(super) struct SessionModelOverride {
547 pub(super) model: String,
548 #[allow(dead_code)]
549 pub(super) updated_at_ms: u64,
550 pub(super) last_seen_ms: u64,
551}
552
553#[derive(Debug, Clone)]
554pub(super) struct SessionServiceTierOverride {
555 pub(super) service_tier: String,
556 #[allow(dead_code)]
557 pub(super) updated_at_ms: u64,
558 pub(super) last_seen_ms: u64,
559}
560
561#[derive(Debug, Clone)]
562pub(super) struct SessionBindingEntry {
563 pub(super) binding: SessionBinding,
564}
565
566#[derive(Debug, Clone)]
567pub(super) struct SessionCwdCacheEntry {
568 pub(super) cwd: Option<String>,
569 pub(super) last_seen_ms: u64,
570}
571
572fn empty_session_identity_card(session_id: Option<String>) -> SessionIdentityCard {
573 SessionIdentityCard {
574 session_id,
575 observation_scope: SessionObservationScope::ObservedOnly,
576 host_local_transcript_path: None,
577 last_client_name: None,
578 last_client_addr: None,
579 cwd: None,
580 active_count: 0,
581 active_started_at_ms_min: None,
582 last_status: None,
583 last_duration_ms: None,
584 last_ended_at_ms: None,
585 last_model: None,
586 last_reasoning_effort: None,
587 last_service_tier: None,
588 last_provider_id: None,
589 last_station_name: None,
590 last_upstream_base_url: None,
591 last_usage: None,
592 total_usage: None,
593 turns_total: None,
594 turns_with_usage: None,
595 binding_profile_name: None,
596 binding_continuity_mode: None,
597 last_route_decision: None,
598 route_affinity: None,
599 effective_model: None,
600 effective_reasoning_effort: None,
601 effective_service_tier: None,
602 effective_station: None,
603 effective_upstream_base_url: None,
604 override_effort: None,
605 override_station_name: None,
606 override_model: None,
607 override_service_tier: None,
608 }
609}
610
611fn non_empty_trimmed(value: Option<&str>) -> Option<String> {
612 value
613 .map(str::trim)
614 .filter(|value| !value.is_empty())
615 .map(ToOwned::to_owned)
616}
617
618fn resolve_effective_observed_value(
619 override_value: Option<&str>,
620 observed_value: Option<&str>,
621 binding_value: Option<&str>,
622) -> Option<ResolvedRouteValue> {
623 if let Some(value) = non_empty_trimmed(override_value) {
624 return Some(ResolvedRouteValue::new(
625 value,
626 RouteValueSource::SessionOverride,
627 ));
628 }
629 let binding_value = non_empty_trimmed(binding_value);
630 if let Some(binding) = binding_value {
631 return Some(ResolvedRouteValue::new(
632 binding,
633 RouteValueSource::ProfileDefault,
634 ));
635 }
636 non_empty_trimmed(observed_value)
637 .map(|observed| ResolvedRouteValue::new(observed, RouteValueSource::RequestPayload))
638}
639
640fn classify_session_observation_scope(card: &SessionIdentityCard) -> SessionObservationScope {
641 if card.cwd.is_some() {
642 SessionObservationScope::HostLocalEnriched
643 } else {
644 SessionObservationScope::ObservedOnly
645 }
646}
647
648fn resolve_effective_station_value(
649 card: &SessionIdentityCard,
650 global_station_override: Option<&str>,
651 binding_station_name: Option<&str>,
652) -> Option<ResolvedRouteValue> {
653 if let Some(value) = non_empty_trimmed(card.override_station_name.as_deref()) {
654 return Some(ResolvedRouteValue::new(
655 value,
656 RouteValueSource::SessionOverride,
657 ));
658 }
659 if let Some(value) = non_empty_trimmed(global_station_override) {
660 return Some(ResolvedRouteValue::new(
661 value,
662 RouteValueSource::GlobalOverride,
663 ));
664 }
665 let binding = non_empty_trimmed(binding_station_name);
666 if let Some(binding) = binding {
667 return Some(ResolvedRouteValue::new(
668 binding,
669 RouteValueSource::ProfileDefault,
670 ));
671 }
672 non_empty_trimmed(card.last_station_name.as_deref())
673 .map(|observed| ResolvedRouteValue::new(observed, RouteValueSource::RuntimeFallback))
674}
675
676fn apply_basic_effective_route(
677 card: &mut SessionIdentityCard,
678 global_station_override: Option<&str>,
679 binding: Option<&SessionBinding>,
680) {
681 card.effective_model = resolve_effective_observed_value(
682 card.override_model.as_deref(),
683 card.last_model.as_deref(),
684 binding.and_then(|binding| binding.model.as_deref()),
685 );
686 card.effective_reasoning_effort = resolve_effective_observed_value(
687 card.override_effort.as_deref(),
688 card.last_reasoning_effort.as_deref(),
689 binding.and_then(|binding| binding.reasoning_effort.as_deref()),
690 );
691 card.effective_service_tier = resolve_effective_observed_value(
692 card.override_service_tier.as_deref(),
693 card.last_service_tier.as_deref(),
694 binding.and_then(|binding| binding.service_tier.as_deref()),
695 );
696 card.binding_profile_name = binding.and_then(|binding| binding.profile_name.clone());
697 card.binding_continuity_mode = binding.map(|binding| binding.continuity_mode);
698 card.effective_station = resolve_effective_station_value(
699 card,
700 global_station_override,
701 binding.and_then(|binding| binding.station_name.as_deref()),
702 );
703 card.effective_upstream_base_url = match (
704 card.effective_station.as_ref(),
705 non_empty_trimmed(card.last_station_name.as_deref()),
706 non_empty_trimmed(card.last_upstream_base_url.as_deref()),
707 ) {
708 (Some(station), Some(last_station), Some(upstream)) if station.value == last_station => {
709 Some(ResolvedRouteValue::new(
710 upstream,
711 RouteValueSource::RuntimeFallback,
712 ))
713 }
714 _ => None,
715 };
716}
717
718pub fn enrich_session_identity_cards_with_runtime(
719 cards: &mut [SessionIdentityCard],
720 mgr: &ServiceConfigManager,
721) {
722 for card in cards {
723 if card.effective_station.is_none()
724 && let Some(active) = mgr.active_station()
725 {
726 card.effective_station = Some(ResolvedRouteValue::new(
727 active.name.clone(),
728 RouteValueSource::RuntimeFallback,
729 ));
730 }
731
732 let effective_station_name = card
733 .effective_station
734 .as_ref()
735 .map(|value| value.value.as_str());
736 if card.effective_upstream_base_url.is_none()
737 && let Some(station_name) = effective_station_name
738 && let Some(station) = mgr.station(station_name)
739 && station.upstreams.len() == 1
740 {
741 card.effective_upstream_base_url = Some(ResolvedRouteValue::new(
742 station.upstreams[0].base_url.clone(),
743 RouteValueSource::RuntimeFallback,
744 ));
745 }
746
747 let Some(model) = card
748 .effective_model
749 .as_ref()
750 .map(|value| value.value.clone())
751 else {
752 continue;
753 };
754 let Some(station_name) = effective_station_name else {
755 continue;
756 };
757 let Some(last_station_name) = card.last_station_name.as_deref() else {
758 continue;
759 };
760 if last_station_name != station_name {
761 continue;
762 }
763 let Some(last_upstream_base_url) = card.last_upstream_base_url.as_deref() else {
764 continue;
765 };
766 let Some(station) = mgr.station(station_name) else {
767 continue;
768 };
769 let Some(upstream) = station
770 .upstreams
771 .iter()
772 .find(|upstream| upstream.base_url == last_upstream_base_url)
773 else {
774 continue;
775 };
776
777 let mapped = crate::model_routing::effective_model(&upstream.model_mapping, model.as_str());
778 if mapped != model {
779 card.effective_model = Some(ResolvedRouteValue::new(
780 mapped,
781 RouteValueSource::StationMapping,
782 ));
783 }
784 }
785}
786
787fn session_identity_sort_key(card: &SessionIdentityCard) -> u64 {
788 card.last_ended_at_ms
789 .unwrap_or(0)
790 .max(card.active_started_at_ms_min.unwrap_or(0))
791}
792
793fn route_decision_at_ms(route_decision: Option<&RouteDecisionProvenance>) -> u64 {
794 route_decision
795 .map(|decision| decision.decided_at_ms)
796 .unwrap_or(0)
797}
798
799fn update_card_route_decision(
800 card: &mut SessionIdentityCard,
801 route_decision: Option<&RouteDecisionProvenance>,
802) {
803 let Some(route_decision) = route_decision.cloned() else {
804 return;
805 };
806 let current_at = route_decision_at_ms(card.last_route_decision.as_ref());
807 if current_at <= route_decision.decided_at_ms {
808 card.last_route_decision = Some(route_decision);
809 }
810}
811
812pub struct SessionIdentityCardBuildInputs<'a> {
813 pub active: &'a [ActiveRequest],
814 pub recent: &'a [FinishedRequest],
815 pub overrides: &'a HashMap<String, String>,
816 pub station_overrides: &'a HashMap<String, String>,
817 pub model_overrides: &'a HashMap<String, String>,
818 pub service_tier_overrides: &'a HashMap<String, String>,
819 pub bindings: &'a HashMap<String, SessionBinding>,
820 pub route_affinities: &'a HashMap<String, SessionRouteAffinity>,
821 pub global_station_override: Option<&'a str>,
822 pub stats: &'a HashMap<String, SessionStats>,
823}
824
825pub fn build_session_identity_cards_from_parts(
826 inputs: SessionIdentityCardBuildInputs<'_>,
827) -> Vec<SessionIdentityCard> {
828 let SessionIdentityCardBuildInputs {
829 active,
830 recent,
831 overrides,
832 station_overrides,
833 model_overrides,
834 service_tier_overrides,
835 bindings,
836 route_affinities,
837 global_station_override,
838 stats,
839 } = inputs;
840
841 use std::collections::HashMap as StdHashMap;
842
843 let mut map: StdHashMap<Option<String>, SessionIdentityCard> = StdHashMap::new();
844
845 for req in active {
846 let key = req.session_id.clone();
847 let entry = map
848 .entry(key.clone())
849 .or_insert_with(|| empty_session_identity_card(key));
850
851 entry.active_count = entry.active_count.saturating_add(1);
852 entry.active_started_at_ms_min = Some(
853 entry
854 .active_started_at_ms_min
855 .unwrap_or(req.started_at_ms)
856 .min(req.started_at_ms),
857 );
858 if entry.cwd.is_none() {
859 entry.cwd = req.cwd.clone();
860 }
861 if entry.last_client_name.is_none() {
862 entry.last_client_name = req.client_name.clone();
863 }
864 if entry.last_client_addr.is_none() {
865 entry.last_client_addr = req.client_addr.clone();
866 }
867 if let Some(effort) = req.reasoning_effort.as_ref() {
868 entry.last_reasoning_effort = Some(effort.clone());
869 }
870 if let Some(service_tier) = req.service_tier.as_ref() {
871 entry.last_service_tier = Some(service_tier.clone());
872 }
873 if entry.last_model.is_none() {
874 entry.last_model = req.model.clone();
875 }
876 if entry.last_provider_id.is_none() {
877 entry.last_provider_id = req.provider_id.clone();
878 }
879 if entry.last_station_name.is_none() {
880 entry.last_station_name = req.station_name.clone();
881 }
882 if entry.last_upstream_base_url.is_none() {
883 entry.last_upstream_base_url = req.upstream_base_url.clone();
884 }
885 update_card_route_decision(entry, req.route_decision.as_ref());
886 }
887
888 for r in recent {
889 let key = r.session_id.clone();
890 let entry = map
891 .entry(key.clone())
892 .or_insert_with(|| empty_session_identity_card(key));
893
894 let should_update = entry
895 .last_ended_at_ms
896 .is_none_or(|prev| r.ended_at_ms >= prev);
897 if should_update {
898 entry.last_status = Some(r.status_code);
899 entry.last_duration_ms = Some(r.duration_ms);
900 entry.last_ended_at_ms = Some(r.ended_at_ms);
901 entry.last_client_name = r.client_name.clone().or(entry.last_client_name.clone());
902 entry.last_client_addr = r.client_addr.clone().or(entry.last_client_addr.clone());
903 entry.last_model = r.model.clone().or(entry.last_model.clone());
904 entry.last_reasoning_effort = r
905 .reasoning_effort
906 .clone()
907 .or(entry.last_reasoning_effort.clone());
908 entry.last_service_tier = r.service_tier.clone().or(entry.last_service_tier.clone());
909 entry.last_provider_id = r.provider_id.clone().or(entry.last_provider_id.clone());
910 entry.last_station_name = r.station_name.clone().or(entry.last_station_name.clone());
911 entry.last_upstream_base_url = r
912 .upstream_base_url
913 .clone()
914 .or(entry.last_upstream_base_url.clone());
915 entry.last_usage = r.usage.clone().or(entry.last_usage.clone());
916 }
917 if entry.cwd.is_none() {
918 entry.cwd = r.cwd.clone();
919 }
920 update_card_route_decision(entry, r.route_decision.as_ref());
921 }
922
923 for (sid, st) in stats {
924 let key = Some(sid.clone());
925 let entry = map
926 .entry(key.clone())
927 .or_insert_with(|| empty_session_identity_card(key));
928
929 if entry.turns_total.is_none() {
930 entry.turns_total = Some(st.turns_total);
931 }
932 if entry.last_client_name.is_none() {
933 entry.last_client_name = st.last_client_name.clone();
934 }
935 if entry.last_client_addr.is_none() {
936 entry.last_client_addr = st.last_client_addr.clone();
937 }
938 if entry.last_status.is_none() {
939 entry.last_status = st.last_status;
940 }
941 if entry.last_duration_ms.is_none() {
942 entry.last_duration_ms = st.last_duration_ms;
943 }
944 if entry.last_ended_at_ms.is_none() {
945 entry.last_ended_at_ms = st.last_ended_at_ms;
946 }
947 if entry.last_model.is_none() {
948 entry.last_model = st.last_model.clone();
949 }
950 if entry.last_reasoning_effort.is_none() {
951 entry.last_reasoning_effort = st.last_reasoning_effort.clone();
952 }
953 if entry.last_service_tier.is_none() {
954 entry.last_service_tier = st.last_service_tier.clone();
955 }
956 if entry.last_provider_id.is_none() {
957 entry.last_provider_id = st.last_provider_id.clone();
958 }
959 if entry.last_station_name.is_none() {
960 entry.last_station_name = st.last_station_name.clone();
961 }
962 if entry.last_usage.is_none() {
963 entry.last_usage = st.last_usage.clone();
964 }
965 if entry.total_usage.is_none() {
966 entry.total_usage = Some(st.total_usage.clone());
967 }
968 if entry.turns_with_usage.is_none() {
969 entry.turns_with_usage = Some(st.turns_with_usage);
970 }
971 update_card_route_decision(entry, st.last_route_decision.as_ref());
972 }
973
974 for (sid, eff) in overrides {
975 let key = Some(sid.clone());
976 let entry = map
977 .entry(key.clone())
978 .or_insert_with(|| empty_session_identity_card(key));
979 entry.override_effort = Some(eff.clone());
980 }
981
982 for (sid, station_name) in station_overrides {
983 let key = Some(sid.clone());
984 let entry = map
985 .entry(key.clone())
986 .or_insert_with(|| empty_session_identity_card(key));
987 entry.override_station_name = Some(station_name.clone());
988 }
989
990 for (sid, model) in model_overrides {
991 let key = Some(sid.clone());
992 let entry = map
993 .entry(key.clone())
994 .or_insert_with(|| empty_session_identity_card(key));
995 entry.override_model = Some(model.clone());
996 }
997
998 for (sid, service_tier) in service_tier_overrides {
999 let key = Some(sid.clone());
1000 let entry = map
1001 .entry(key.clone())
1002 .or_insert_with(|| empty_session_identity_card(key));
1003 entry.override_service_tier = Some(service_tier.clone());
1004 }
1005
1006 for (sid, affinity) in route_affinities {
1007 let key = Some(sid.clone());
1008 let entry = map
1009 .entry(key.clone())
1010 .or_insert_with(|| empty_session_identity_card(key));
1011 entry.route_affinity = Some(affinity.clone());
1012 }
1013
1014 let mut cards = map.into_values().collect::<Vec<_>>();
1015 for card in &mut cards {
1016 let binding = card
1017 .session_id
1018 .as_deref()
1019 .and_then(|session_id| bindings.get(session_id));
1020 apply_basic_effective_route(card, global_station_override, binding);
1021 card.observation_scope = classify_session_observation_scope(card);
1022 }
1023 cards.sort_by_key(|card| std::cmp::Reverse(session_identity_sort_key(card)));
1024 cards
1025}
1026
1027pub async fn enrich_session_identity_cards_with_host_transcripts(
1028 cards: &mut [SessionIdentityCard],
1029) {
1030 let session_ids = cards
1031 .iter()
1032 .filter_map(|card| card.session_id.as_deref())
1033 .map(str::to_owned)
1034 .collect::<Vec<_>>();
1035 if session_ids.is_empty() {
1036 return;
1037 }
1038
1039 let found = match sessions::find_codex_session_files_by_ids(&session_ids).await {
1040 Ok(found) => found,
1041 Err(_) => return,
1042 };
1043
1044 for card in cards {
1045 card.host_local_transcript_path = card
1046 .session_id
1047 .as_deref()
1048 .and_then(|sid| found.get(sid))
1049 .map(|path| path.to_string_lossy().to_string());
1050 }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055 use super::*;
1056
1057 use crate::logging::RetryInfo;
1058 use crate::pricing::CostBreakdown;
1059 use crate::usage::UsageMetrics;
1060
1061 fn sample_finished_request() -> FinishedRequest {
1062 FinishedRequest {
1063 id: 7,
1064 trace_id: Some("codex-7".to_string()),
1065 session_id: Some("sid".to_string()),
1066 client_name: None,
1067 client_addr: None,
1068 cwd: None,
1069 model: Some("gpt-5".to_string()),
1070 reasoning_effort: None,
1071 service_tier: Some("default".to_string()),
1072 station_name: Some("primary".to_string()),
1073 provider_id: Some("primary-provider".to_string()),
1074 upstream_base_url: Some("https://primary.example/v1".to_string()),
1075 route_decision: Some(RouteDecisionProvenance {
1076 effective_service_tier: Some(ResolvedRouteValue {
1077 value: "priority".to_string(),
1078 source: RouteValueSource::ProfileDefault,
1079 }),
1080 ..Default::default()
1081 }),
1082 usage: Some(UsageMetrics {
1083 output_tokens: 200,
1084 total_tokens: 200,
1085 ..UsageMetrics::default()
1086 }),
1087 cost: CostBreakdown::default(),
1088 retry: Some(RetryInfo {
1089 attempts: 2,
1090 upstream_chain: vec![
1091 "backup:https://backup.example/v1 (idx=0) transport_error=timeout".to_string(),
1092 "primary:https://primary.example/v1 (idx=0) status=200 class=-".to_string(),
1093 ],
1094 route_attempts: Vec::new(),
1095 }),
1096 observability: RequestObservability::default(),
1097 service: "codex".to_string(),
1098 method: "POST".to_string(),
1099 path: "/v1/responses".to_string(),
1100 status_code: 200,
1101 duration_ms: 1_500,
1102 ttfb_ms: Some(500),
1103 streaming: true,
1104 ended_at_ms: 2_000,
1105 }
1106 }
1107
1108 #[test]
1109 fn finished_request_observability_derives_canonical_request_facts() {
1110 let request = sample_finished_request();
1111
1112 let observability = request.observability_view();
1113
1114 assert_eq!(observability.trace_id.as_deref(), Some("codex-7"));
1115 assert_eq!(observability.duration_ms, Some(1_500));
1116 assert_eq!(observability.ttfb_ms, Some(500));
1117 assert_eq!(observability.generation_ms, Some(1_000));
1118 assert_eq!(observability.attempt_count, 2);
1119 assert_eq!(observability.route_attempt_count, 2);
1120 assert!(observability.retried);
1121 assert!(observability.cross_station_failover);
1122 assert!(observability.fast_mode);
1123 assert!(observability.streaming);
1124 assert_eq!(observability.output_tokens_per_second, Some(200.0));
1125 }
1126
1127 #[test]
1128 fn finished_request_serializes_materialized_observability_for_operator_api() {
1129 let mut request = sample_finished_request();
1130 request.refresh_observability();
1131
1132 let value = serde_json::to_value(&request).expect("finished request json");
1133
1134 assert_eq!(value["observability"]["trace_id"].as_str(), Some("codex-7"));
1135 assert_eq!(
1136 value["observability"]["generation_ms"].as_u64(),
1137 Some(1_000)
1138 );
1139 assert_eq!(
1140 value["observability"]["output_tokens_per_second"].as_f64(),
1141 Some(200.0)
1142 );
1143 assert_eq!(value["observability"]["attempt_count"].as_u64(), Some(2));
1144 assert_eq!(value["observability"]["streaming"].as_bool(), Some(true));
1145 }
1146
1147 #[test]
1148 fn finished_request_legacy_payload_still_derives_observability() {
1149 let request: FinishedRequest = serde_json::from_value(serde_json::json!({
1150 "id": 8,
1151 "trace_id": "codex-8",
1152 "usage": {
1153 "output_tokens": 120,
1154 "total_tokens": 120
1155 },
1156 "cost": {},
1157 "service": "codex",
1158 "method": "POST",
1159 "path": "/v1/responses",
1160 "status_code": 200,
1161 "duration_ms": 1_500,
1162 "ttfb_ms": 300,
1163 "ended_at_ms": 2_500
1164 }))
1165 .expect("legacy finished request");
1166
1167 assert_eq!(request.observability.attempt_count, 1);
1168 assert!(!request.streaming);
1169
1170 let observability = request.observability_view();
1171 assert_eq!(observability.generation_ms, Some(1_200));
1172 assert_eq!(observability.output_tokens_per_second, Some(100.0));
1173 assert_eq!(observability.attempt_count, 1);
1174 assert!(!observability.fast_mode);
1175 }
1176}