1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use serde::{Deserialize, Serialize};
4
5use crate::pricing::{CostConfidence, UsdAmount};
6use crate::routing_explain::{RoutingExplainCandidate, RoutingExplainResponse};
7use crate::state::{
8 BalanceSnapshotStatus, FinishedRequest, ProviderBalanceSnapshot, UsageBucket, UsageRollupView,
9};
10use crate::usage_providers::UsageProviderRefreshSummary;
11
12#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, Hash)]
13#[serde(rename_all = "snake_case")]
14pub enum UsageBalanceStatus {
15 #[default]
16 Unknown,
17 Ok,
18 Unlimited,
19 Exhausted,
20 Stale,
21 Error,
22}
23
24impl UsageBalanceStatus {
25 pub fn as_str(self) -> &'static str {
26 match self {
27 UsageBalanceStatus::Unknown => "unknown",
28 UsageBalanceStatus::Ok => "ok",
29 UsageBalanceStatus::Unlimited => "unlimited",
30 UsageBalanceStatus::Exhausted => "exhausted",
31 UsageBalanceStatus::Stale => "stale",
32 UsageBalanceStatus::Error => "error",
33 }
34 }
35
36 pub fn is_attention(self) -> bool {
37 matches!(
38 self,
39 UsageBalanceStatus::Unknown
40 | UsageBalanceStatus::Exhausted
41 | UsageBalanceStatus::Stale
42 | UsageBalanceStatus::Error
43 )
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
48pub struct UsageBalanceStatusCounts {
49 pub ok: usize,
50 pub unlimited: usize,
51 pub exhausted: usize,
52 pub stale: usize,
53 pub error: usize,
54 pub unknown: usize,
55}
56
57impl UsageBalanceStatusCounts {
58 pub fn total(&self) -> usize {
59 self.ok
60 .saturating_add(self.unlimited)
61 .saturating_add(self.exhausted)
62 .saturating_add(self.stale)
63 .saturating_add(self.error)
64 .saturating_add(self.unknown)
65 }
66
67 pub fn record(&mut self, status: UsageBalanceStatus) {
68 match status {
69 UsageBalanceStatus::Ok => self.ok += 1,
70 UsageBalanceStatus::Unlimited => self.unlimited += 1,
71 UsageBalanceStatus::Exhausted => self.exhausted += 1,
72 UsageBalanceStatus::Stale => self.stale += 1,
73 UsageBalanceStatus::Error => self.error += 1,
74 UsageBalanceStatus::Unknown => self.unknown += 1,
75 }
76 }
77
78 pub fn aggregate_status(&self) -> UsageBalanceStatus {
79 if self.total() == 0 {
80 UsageBalanceStatus::Unknown
81 } else if self.error > 0 {
82 UsageBalanceStatus::Error
83 } else if self.exhausted > 0 {
84 UsageBalanceStatus::Exhausted
85 } else if self.stale > 0 {
86 UsageBalanceStatus::Stale
87 } else if self.unknown > 0 {
88 UsageBalanceStatus::Unknown
89 } else if self.unlimited > 0 {
90 UsageBalanceStatus::Unlimited
91 } else {
92 UsageBalanceStatus::Ok
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
98pub struct UsageBalanceView {
99 pub service_name: String,
100 pub window_days: usize,
101 pub generated_at_ms: u64,
102 pub totals: UsageBalanceTotals,
103 pub provider_rows: Vec<UsageBalanceProviderRow>,
104 pub endpoint_rows: Vec<UsageBalanceEndpointRow>,
105 pub routing_impacts: Vec<UsageBalanceRouteImpact>,
106 pub refresh_status: UsageBalanceRefreshStatus,
107 #[serde(default, skip_serializing_if = "Vec::is_empty")]
108 pub warnings: Vec<String>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
112pub struct UsageBalanceTotals {
113 pub requests_total: u64,
114 pub requests_error: u64,
115 pub success_per_mille: Option<u16>,
116 pub input_tokens: i64,
117 pub cached_input_tokens: i64,
118 pub output_tokens: i64,
119 pub reasoning_output_tokens: i64,
120 pub total_tokens: i64,
121 pub cost_total_usd: Option<String>,
122 pub cost_display: String,
123 pub cost_confidence: CostConfidence,
124 pub balance_status_counts: UsageBalanceStatusCounts,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
128pub struct UsageBalanceProviderRow {
129 pub provider_id: String,
130 pub display_name: String,
131 pub usage: UsageBucket,
132 pub success_per_mille: Option<u16>,
133 pub output_tokens_per_second: Option<u64>,
134 pub avg_ttfb_ms: Option<u64>,
135 pub cost_total_usd: Option<String>,
136 pub cost_display: String,
137 pub cost_confidence: CostConfidence,
138 pub balance_status: UsageBalanceStatus,
139 pub balance_counts: UsageBalanceStatusCounts,
140 pub primary_balance: Option<UsageBalanceSnapshotSummary>,
141 pub latest_balance_error: Option<String>,
142 pub balance_age_ms: Option<u64>,
143 pub routing: UsageBalanceRouteImpact,
144 pub endpoint_count: usize,
145 pub endpoints_with_balance: usize,
146 pub recent_endpoint_requests: u64,
147}
148
149impl UsageBalanceProviderRow {
150 pub fn needs_attention(&self) -> bool {
151 self.balance_status.is_attention()
152 || self
153 .latest_balance_error
154 .as_deref()
155 .is_some_and(|value| !value.trim().is_empty())
156 || self.usage.requests_error > 0
157 }
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
161pub struct UsageBalanceEndpointRow {
162 pub provider_id: String,
163 pub endpoint_id: String,
164 pub station_name: Option<String>,
165 pub upstream_index: Option<usize>,
166 pub base_url: Option<String>,
167 pub usage: UsageBucket,
168 pub balance_status: UsageBalanceStatus,
169 pub balance: Option<UsageBalanceSnapshotSummary>,
170 pub route_selected: bool,
171 pub route_skip_reasons: Vec<String>,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
175pub struct UsageBalanceSnapshotSummary {
176 pub provider_id: String,
177 pub station_name: Option<String>,
178 pub upstream_index: Option<usize>,
179 pub source: String,
180 pub status: UsageBalanceStatus,
181 pub amount_summary: String,
182 pub fetched_at_ms: u64,
183 pub stale_after_ms: Option<u64>,
184 pub age_ms: Option<u64>,
185 pub error: Option<String>,
186 pub exhaustion_affects_routing: bool,
187 pub routing_exhausted: bool,
188 pub routing_ignored_exhaustion: bool,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
192pub struct UsageBalanceRouteImpact {
193 pub provider_id: String,
194 pub selected: bool,
195 pub selected_endpoint_id: Option<String>,
196 pub selected_provider_endpoint_key: Option<String>,
197 pub candidate_count: usize,
198 pub skip_reasons: Vec<String>,
199 pub route_paths: Vec<Vec<String>>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
203pub struct UsageBalanceRefreshStatus {
204 pub refreshing: bool,
205 pub total_snapshots: usize,
206 pub latest_fetched_at_ms: Option<u64>,
207 pub latest_error: Option<String>,
208 #[serde(default, skip_serializing_if = "Option::is_none")]
209 pub latest_error_provider_id: Option<String>,
210 pub last_message: Option<String>,
211 pub last_error: Option<String>,
212 #[serde(default, skip_serializing_if = "Option::is_none")]
213 pub last_provider_refresh: Option<UsageProviderRefreshSummary>,
214 pub status_counts: UsageBalanceStatusCounts,
215}
216
217#[derive(Debug, Clone, Default)]
218pub struct UsageBalanceRefreshInput {
219 pub refreshing: bool,
220 pub last_message: Option<String>,
221 pub last_error: Option<String>,
222 pub last_provider_refresh: Option<UsageProviderRefreshSummary>,
223}
224
225pub struct UsageBalanceBuildInput<'a> {
226 pub service_name: &'a str,
227 pub window_days: usize,
228 pub generated_at_ms: u64,
229 pub usage_rollup: &'a UsageRollupView,
230 pub provider_balances: &'a HashMap<String, Vec<ProviderBalanceSnapshot>>,
231 pub recent: &'a [FinishedRequest],
232 pub routing_explain: Option<&'a RoutingExplainResponse>,
233 pub refresh: UsageBalanceRefreshInput,
234}
235
236impl UsageBalanceView {
237 pub fn build(input: UsageBalanceBuildInput<'_>) -> Self {
238 let route_index = RouteIndex::from_explain(input.routing_explain);
239 let mut endpoint_rows = build_endpoint_rows(
240 input.recent,
241 input.provider_balances,
242 &route_index,
243 input.generated_at_ms,
244 );
245 sort_endpoint_rows(&mut endpoint_rows);
246
247 let provider_rows = build_provider_rows(&input, &route_index, &endpoint_rows);
248 let totals = build_totals(
249 input.usage_rollup,
250 input.provider_balances,
251 input.generated_at_ms,
252 );
253 let refresh_status = build_refresh_status(
254 input.provider_balances,
255 input.generated_at_ms,
256 input.refresh,
257 );
258 let routing_impacts = route_index
259 .provider_impacts
260 .values()
261 .cloned()
262 .collect::<Vec<_>>();
263
264 Self {
265 service_name: input.service_name.to_string(),
266 window_days: input.window_days,
267 generated_at_ms: input.generated_at_ms,
268 totals,
269 provider_rows,
270 endpoint_rows,
271 routing_impacts,
272 refresh_status,
273 warnings: Vec::new(),
274 }
275 }
276}
277
278fn build_totals(
279 rollup: &UsageRollupView,
280 provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
281 now_ms: u64,
282) -> UsageBalanceTotals {
283 let window = &rollup.window;
284 let mut balance_status_counts = UsageBalanceStatusCounts::default();
285 for snapshot in provider_balances.values().flatten() {
286 balance_status_counts.record(classify_snapshot(snapshot, now_ms));
287 }
288
289 UsageBalanceTotals {
290 requests_total: window.requests_total,
291 requests_error: window.requests_error,
292 success_per_mille: per_mille(
293 window.requests_total.saturating_sub(window.requests_error),
294 window.requests_total,
295 ),
296 input_tokens: window.usage.input_tokens,
297 cached_input_tokens: window.usage.cache_read_tokens_total(),
298 output_tokens: window.usage.output_tokens,
299 reasoning_output_tokens: window.usage.reasoning_output_tokens_total(),
300 total_tokens: window.usage.total_tokens,
301 cost_total_usd: window.cost.total_cost_usd.clone(),
302 cost_display: window.cost.display_total(),
303 cost_confidence: window.cost.confidence,
304 balance_status_counts,
305 }
306}
307
308fn build_provider_rows(
309 input: &UsageBalanceBuildInput<'_>,
310 route_index: &RouteIndex,
311 endpoint_rows: &[UsageBalanceEndpointRow],
312) -> Vec<UsageBalanceProviderRow> {
313 let mut provider_ids = BTreeSet::new();
314 for (provider_id, _) in &input.usage_rollup.by_provider {
315 provider_ids.insert(provider_id.clone());
316 }
317 for (provider_id, _) in group_balances_by_provider(input.provider_balances) {
318 provider_ids.insert(provider_id);
319 }
320 for request in input.recent {
321 if let Some(provider_id) = request
322 .provider_id
323 .as_deref()
324 .map(str::trim)
325 .filter(|value| !value.is_empty())
326 {
327 provider_ids.insert(provider_id.to_string());
328 }
329 }
330 for provider_id in route_index.provider_impacts.keys() {
331 provider_ids.insert(provider_id.clone());
332 }
333
334 let usage_by_provider = input
335 .usage_rollup
336 .by_provider
337 .iter()
338 .cloned()
339 .collect::<BTreeMap<_, _>>();
340 let balances_by_provider = group_balances_by_provider(input.provider_balances);
341 let mut endpoint_counts = BTreeMap::<String, (usize, usize, u64)>::new();
342 for endpoint in endpoint_rows {
343 let entry = endpoint_counts
344 .entry(endpoint.provider_id.clone())
345 .or_insert((0, 0, 0));
346 entry.0 += 1;
347 if endpoint.balance.is_some() {
348 entry.1 += 1;
349 }
350 entry.2 = entry.2.saturating_add(endpoint.usage.requests_total);
351 }
352
353 let mut rows = provider_ids
354 .into_iter()
355 .map(|provider_id| {
356 let usage = usage_by_provider
357 .get(provider_id.as_str())
358 .cloned()
359 .unwrap_or_default();
360 let balances = balances_by_provider
361 .get(provider_id.as_str())
362 .cloned()
363 .unwrap_or_default();
364 let balance_counts = balance_counts_for_snapshots(&balances, input.generated_at_ms);
365 let primary_balance = primary_balance_snapshot(&balances, input.generated_at_ms)
366 .map(|snapshot| summarize_snapshot(snapshot, input.generated_at_ms));
367 let latest_balance_error = latest_balance_error(&balances);
368 let balance_age_ms = primary_balance.as_ref().and_then(|summary| summary.age_ms);
369 let balance_status = balance_counts.aggregate_status();
370 let routing = route_index
371 .provider_impacts
372 .get(provider_id.as_str())
373 .cloned()
374 .unwrap_or_else(|| UsageBalanceRouteImpact {
375 provider_id: provider_id.clone(),
376 ..UsageBalanceRouteImpact::default()
377 });
378 let (endpoint_count, endpoints_with_balance, recent_endpoint_requests) =
379 endpoint_counts
380 .get(provider_id.as_str())
381 .copied()
382 .unwrap_or_default();
383
384 UsageBalanceProviderRow {
385 display_name: provider_id.clone(),
386 provider_id,
387 success_per_mille: per_mille(
388 usage.requests_total.saturating_sub(usage.requests_error),
389 usage.requests_total,
390 ),
391 output_tokens_per_second: output_tokens_per_second(&usage),
392 avg_ttfb_ms: (usage.ttfb_samples > 0)
393 .then(|| usage.ttfb_ms_total / usage.ttfb_samples),
394 cost_total_usd: usage.cost.total_cost_usd.clone(),
395 cost_display: usage.cost.display_total(),
396 cost_confidence: usage.cost.confidence,
397 usage,
398 balance_status,
399 balance_counts,
400 primary_balance,
401 latest_balance_error,
402 balance_age_ms,
403 routing,
404 endpoint_count,
405 endpoints_with_balance,
406 recent_endpoint_requests,
407 }
408 })
409 .collect::<Vec<_>>();
410
411 sort_provider_rows(&mut rows);
412 rows
413}
414
415fn build_refresh_status(
416 provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
417 now_ms: u64,
418 input: UsageBalanceRefreshInput,
419) -> UsageBalanceRefreshStatus {
420 let mut out = UsageBalanceRefreshStatus {
421 refreshing: input.refreshing,
422 last_message: input.last_message,
423 last_error: input.last_error,
424 last_provider_refresh: input.last_provider_refresh,
425 ..UsageBalanceRefreshStatus::default()
426 };
427 let mut latest_error_fetched_at_ms: Option<u64> = None;
428
429 for snapshot in provider_balances.values().flatten() {
430 out.total_snapshots += 1;
431 out.latest_fetched_at_ms = Some(
432 out.latest_fetched_at_ms
433 .unwrap_or(0)
434 .max(snapshot.fetched_at_ms),
435 );
436 let status = classify_snapshot(snapshot, now_ms);
437 out.status_counts.record(status);
438 if let Some(error) = snapshot
439 .error
440 .as_deref()
441 .map(str::trim)
442 .filter(|v| !v.is_empty())
443 && latest_error_fetched_at_ms.is_none_or(|latest| snapshot.fetched_at_ms >= latest)
444 {
445 out.latest_error = Some(error.to_string());
446 out.latest_error_provider_id = non_empty(snapshot.provider_id.clone())
447 .or_else(|| non_empty(snapshot.station_name.clone().unwrap_or_default()));
448 latest_error_fetched_at_ms = Some(snapshot.fetched_at_ms);
449 }
450 }
451
452 out
453}
454
455fn build_endpoint_rows(
456 recent: &[FinishedRequest],
457 provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
458 route_index: &RouteIndex,
459 now_ms: u64,
460) -> Vec<UsageBalanceEndpointRow> {
461 let mut acc = BTreeMap::<EndpointKey, EndpointAccum>::new();
462
463 for candidate in route_index.candidates.values() {
464 let key = EndpointKey {
465 provider_id: candidate.provider_id.clone(),
466 endpoint_id: candidate.endpoint_id.clone(),
467 };
468 let row = acc
469 .entry(key.clone())
470 .or_insert_with(|| EndpointAccum::new(key));
471 row.base_url = non_empty(candidate.base_url.clone()).or(row.base_url.take());
472 row.route_selected |= candidate.selected;
473 row.route_skip_reasons
474 .extend(candidate.skip_reasons.iter().cloned());
475 if let Some((station_name, upstream_index)) = candidate.compatibility.as_ref() {
476 row.station_name = Some(station_name.clone());
477 row.upstream_index = Some(*upstream_index);
478 }
479 }
480
481 for request in recent {
482 let Some(provider_id) = request
483 .provider_id
484 .as_deref()
485 .map(str::trim)
486 .filter(|value| !value.is_empty())
487 else {
488 continue;
489 };
490 let base_url = request
491 .upstream_base_url
492 .as_deref()
493 .map(str::trim)
494 .filter(|value| !value.is_empty());
495 let endpoint_id = base_url
496 .and_then(|base_url| {
497 route_index
498 .endpoint_by_provider_base_url
499 .get(&(provider_id.to_string(), base_url.to_string()))
500 .cloned()
501 })
502 .unwrap_or_else(|| {
503 base_url
504 .map(endpoint_id_from_base_url)
505 .unwrap_or_else(|| "unknown_endpoint".to_string())
506 });
507 let key = EndpointKey {
508 provider_id: provider_id.to_string(),
509 endpoint_id,
510 };
511 let row = acc
512 .entry(key.clone())
513 .or_insert_with(|| EndpointAccum::new(key));
514 if row.base_url.is_none() {
515 row.base_url = base_url.map(ToOwned::to_owned);
516 }
517 record_finished_into_bucket(&mut row.usage, request);
518 }
519
520 for (station_name, snapshots) in provider_balances {
521 for snapshot in snapshots {
522 let provider_id = balance_provider_id(station_name, snapshot);
523 let endpoint_id = snapshot
524 .upstream_index
525 .and_then(|idx| {
526 route_index
527 .endpoint_by_provider_station_upstream
528 .get(&(provider_id.clone(), station_name.clone(), idx))
529 .cloned()
530 })
531 .unwrap_or_else(|| {
532 snapshot
533 .upstream_index
534 .map(|idx| format!("upstream#{idx}"))
535 .or_else(|| non_empty(snapshot.source.clone()))
536 .unwrap_or_else(|| "balance".to_string())
537 });
538 let key = EndpointKey {
539 provider_id: provider_id.clone(),
540 endpoint_id,
541 };
542 let row = acc
543 .entry(key.clone())
544 .or_insert_with(|| EndpointAccum::new(key));
545 row.station_name = snapshot
546 .station_name
547 .clone()
548 .or_else(|| Some(station_name.clone()))
549 .or(row.station_name.take());
550 row.upstream_index = snapshot.upstream_index.or(row.upstream_index);
551 let summary = summarize_snapshot(snapshot, now_ms);
552 let replace = row.balance.as_ref().is_none_or(|existing| {
553 snapshot_display_rank(summary.status) < snapshot_display_rank(existing.status)
554 });
555 if replace {
556 row.balance = Some(summary);
557 }
558 }
559 }
560
561 acc.into_values().map(EndpointAccum::finish).collect()
562}
563
564fn record_finished_into_bucket(bucket: &mut UsageBucket, request: &FinishedRequest) {
565 bucket.requests_total = bucket.requests_total.saturating_add(1);
566 if request.status_code >= 400 {
567 bucket.requests_error = bucket.requests_error.saturating_add(1);
568 }
569 bucket.duration_ms_total = bucket.duration_ms_total.saturating_add(request.duration_ms);
570
571 let Some(usage) = request.usage.as_ref() else {
572 return;
573 };
574
575 bucket.usage.add_assign(usage);
576 bucket.cost.record_usage_cost(&request.cost);
577 bucket.requests_with_usage = bucket.requests_with_usage.saturating_add(1);
578 bucket.duration_ms_with_usage_total = bucket
579 .duration_ms_with_usage_total
580 .saturating_add(request.duration_ms);
581 let generation_ms = match request.ttfb_ms {
582 Some(ttfb) if ttfb > 0 && ttfb < request.duration_ms => {
583 request.duration_ms.saturating_sub(ttfb)
584 }
585 _ => request.duration_ms,
586 };
587 bucket.generation_ms_total = bucket.generation_ms_total.saturating_add(generation_ms);
588 if let Some(ttfb) = request.ttfb_ms.filter(|value| *value > 0) {
589 bucket.ttfb_ms_total = bucket.ttfb_ms_total.saturating_add(ttfb);
590 bucket.ttfb_samples = bucket.ttfb_samples.saturating_add(1);
591 }
592}
593
594fn group_balances_by_provider(
595 provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
596) -> BTreeMap<String, Vec<&ProviderBalanceSnapshot>> {
597 let mut out = BTreeMap::<String, Vec<&ProviderBalanceSnapshot>>::new();
598 for (station_name, snapshots) in provider_balances {
599 for snapshot in snapshots {
600 out.entry(balance_provider_id(station_name, snapshot))
601 .or_default()
602 .push(snapshot);
603 }
604 }
605 out
606}
607
608fn balance_provider_id(station_name: &str, snapshot: &ProviderBalanceSnapshot) -> String {
609 if snapshot.provider_id.trim().is_empty() {
610 station_name.to_string()
611 } else {
612 snapshot.provider_id.trim().to_string()
613 }
614}
615
616fn balance_counts_for_snapshots(
617 snapshots: &[&ProviderBalanceSnapshot],
618 now_ms: u64,
619) -> UsageBalanceStatusCounts {
620 let mut counts = UsageBalanceStatusCounts::default();
621 for snapshot in snapshots {
622 counts.record(classify_snapshot(snapshot, now_ms));
623 }
624 counts
625}
626
627fn classify_snapshot(snapshot: &ProviderBalanceSnapshot, now_ms: u64) -> UsageBalanceStatus {
628 match snapshot.status_at(now_ms) {
629 BalanceSnapshotStatus::Ok if snapshot.unlimited_quota == Some(true) => {
630 UsageBalanceStatus::Unlimited
631 }
632 BalanceSnapshotStatus::Ok => UsageBalanceStatus::Ok,
633 BalanceSnapshotStatus::Exhausted => UsageBalanceStatus::Exhausted,
634 BalanceSnapshotStatus::Stale => UsageBalanceStatus::Stale,
635 BalanceSnapshotStatus::Error => UsageBalanceStatus::Error,
636 BalanceSnapshotStatus::Unknown => UsageBalanceStatus::Unknown,
637 }
638}
639
640fn summarize_snapshot(
641 snapshot: &ProviderBalanceSnapshot,
642 now_ms: u64,
643) -> UsageBalanceSnapshotSummary {
644 UsageBalanceSnapshotSummary {
645 provider_id: snapshot.provider_id.clone(),
646 station_name: snapshot.station_name.clone(),
647 upstream_index: snapshot.upstream_index,
648 source: snapshot.source.clone(),
649 status: classify_snapshot(snapshot, now_ms),
650 amount_summary: snapshot.amount_summary(),
651 fetched_at_ms: snapshot.fetched_at_ms,
652 stale_after_ms: snapshot.stale_after_ms,
653 age_ms: (now_ms >= snapshot.fetched_at_ms).then(|| now_ms - snapshot.fetched_at_ms),
654 error: snapshot.error.clone(),
655 exhaustion_affects_routing: snapshot.exhaustion_affects_routing,
656 routing_exhausted: snapshot.exhaustion_affects_routing
657 && snapshot.status_at(now_ms) == BalanceSnapshotStatus::Exhausted,
658 routing_ignored_exhaustion: !snapshot.exhaustion_affects_routing
659 && snapshot.status_at(now_ms) == BalanceSnapshotStatus::Exhausted,
660 }
661}
662
663fn primary_balance_snapshot<'a>(
664 snapshots: &'a [&ProviderBalanceSnapshot],
665 now_ms: u64,
666) -> Option<&'a ProviderBalanceSnapshot> {
667 snapshots.iter().copied().min_by(|left, right| {
668 snapshot_display_rank(classify_snapshot(left, now_ms))
669 .cmp(&snapshot_display_rank(classify_snapshot(right, now_ms)))
670 .then_with(|| left.upstream_index.cmp(&right.upstream_index))
671 .then_with(|| right.fetched_at_ms.cmp(&left.fetched_at_ms))
672 })
673}
674
675fn snapshot_display_rank(status: UsageBalanceStatus) -> u8 {
676 match status {
677 UsageBalanceStatus::Ok | UsageBalanceStatus::Unlimited => 0,
678 UsageBalanceStatus::Stale => 1,
679 UsageBalanceStatus::Unknown | UsageBalanceStatus::Error => 2,
680 UsageBalanceStatus::Exhausted => 3,
681 }
682}
683
684fn latest_balance_error(snapshots: &[&ProviderBalanceSnapshot]) -> Option<String> {
685 snapshots
686 .iter()
687 .filter_map(|snapshot| {
688 snapshot
689 .error
690 .as_deref()
691 .map(str::trim)
692 .filter(|value| !value.is_empty())
693 .map(|error| (snapshot.fetched_at_ms, error.to_string()))
694 })
695 .max_by_key(|(fetched_at_ms, _)| *fetched_at_ms)
696 .map(|(_, error)| error)
697}
698
699fn per_mille(num: u64, den: u64) -> Option<u16> {
700 (den > 0).then(|| ((num.saturating_mul(1000) / den).min(1000)) as u16)
701}
702
703fn output_tokens_per_second(bucket: &UsageBucket) -> Option<u64> {
704 let output = bucket.usage.output_tokens.max(0) as u64;
705 if output == 0 || bucket.generation_ms_total == 0 {
706 return None;
707 }
708 Some(output.saturating_mul(1000) / bucket.generation_ms_total)
709}
710
711fn sort_provider_rows(rows: &mut [UsageBalanceProviderRow]) {
712 rows.sort_by(|left, right| {
713 provider_selected_rank(right)
714 .cmp(&provider_selected_rank(left))
715 .then_with(|| {
716 provider_attention_rank(left.balance_status)
717 .cmp(&provider_attention_rank(right.balance_status))
718 })
719 .then_with(|| provider_cost_sort_value(right).cmp(&provider_cost_sort_value(left)))
720 .then_with(|| right.usage.requests_total.cmp(&left.usage.requests_total))
721 .then_with(|| left.provider_id.cmp(&right.provider_id))
722 });
723}
724
725fn provider_selected_rank(row: &UsageBalanceProviderRow) -> u8 {
726 u8::from(row.routing.selected)
727}
728
729fn provider_attention_rank(status: UsageBalanceStatus) -> u8 {
730 match status {
731 UsageBalanceStatus::Error => 0,
732 UsageBalanceStatus::Exhausted => 1,
733 UsageBalanceStatus::Stale => 2,
734 UsageBalanceStatus::Unknown => 3,
735 UsageBalanceStatus::Unlimited => 4,
736 UsageBalanceStatus::Ok => 5,
737 }
738}
739
740fn provider_cost_sort_value(row: &UsageBalanceProviderRow) -> i128 {
741 row.cost_total_usd
742 .as_deref()
743 .and_then(UsdAmount::from_decimal_str)
744 .map(UsdAmount::femto_usd)
745 .unwrap_or(0)
746}
747
748fn sort_endpoint_rows(rows: &mut [UsageBalanceEndpointRow]) {
749 rows.sort_by(|left, right| {
750 u8::from(right.route_selected)
751 .cmp(&u8::from(left.route_selected))
752 .then_with(|| {
753 provider_attention_rank(left.balance_status)
754 .cmp(&provider_attention_rank(right.balance_status))
755 })
756 .then_with(|| right.usage.requests_total.cmp(&left.usage.requests_total))
757 .then_with(|| left.provider_id.cmp(&right.provider_id))
758 .then_with(|| left.endpoint_id.cmp(&right.endpoint_id))
759 });
760}
761
762#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
763struct EndpointKey {
764 provider_id: String,
765 endpoint_id: String,
766}
767
768#[derive(Debug, Clone)]
769struct EndpointAccum {
770 key: EndpointKey,
771 station_name: Option<String>,
772 upstream_index: Option<usize>,
773 base_url: Option<String>,
774 usage: UsageBucket,
775 balance: Option<UsageBalanceSnapshotSummary>,
776 route_selected: bool,
777 route_skip_reasons: BTreeSet<String>,
778}
779
780impl EndpointAccum {
781 fn new(key: EndpointKey) -> Self {
782 Self {
783 key,
784 station_name: None,
785 upstream_index: None,
786 base_url: None,
787 usage: UsageBucket::default(),
788 balance: None,
789 route_selected: false,
790 route_skip_reasons: BTreeSet::new(),
791 }
792 }
793
794 fn finish(self) -> UsageBalanceEndpointRow {
795 let balance_status = self
796 .balance
797 .as_ref()
798 .map(|balance| balance.status)
799 .unwrap_or(UsageBalanceStatus::Unknown);
800 UsageBalanceEndpointRow {
801 provider_id: self.key.provider_id,
802 endpoint_id: self.key.endpoint_id,
803 station_name: self.station_name,
804 upstream_index: self.upstream_index,
805 base_url: self.base_url,
806 usage: self.usage,
807 balance_status,
808 balance: self.balance,
809 route_selected: self.route_selected,
810 route_skip_reasons: self.route_skip_reasons.into_iter().collect(),
811 }
812 }
813}
814
815#[derive(Debug, Clone)]
816struct RouteCandidateView {
817 provider_id: String,
818 endpoint_id: String,
819 base_url: String,
820 selected: bool,
821 skip_reasons: Vec<String>,
822 compatibility: Option<(String, usize)>,
823}
824
825#[derive(Debug, Clone, Default)]
826struct RouteIndex {
827 candidates: BTreeMap<String, RouteCandidateView>,
828 provider_impacts: BTreeMap<String, UsageBalanceRouteImpact>,
829 endpoint_by_provider_base_url: BTreeMap<(String, String), String>,
830 endpoint_by_provider_station_upstream: BTreeMap<(String, String, usize), String>,
831}
832
833impl RouteIndex {
834 fn from_explain(explain: Option<&RoutingExplainResponse>) -> Self {
835 let mut out = Self::default();
836 let Some(explain) = explain else {
837 return out;
838 };
839
840 for candidate in &explain.candidates {
841 out.record_candidate(candidate);
842 }
843 for impact in out.provider_impacts.values_mut() {
844 impact.skip_reasons.sort();
845 impact.skip_reasons.dedup();
846 impact.route_paths.sort();
847 impact.route_paths.dedup();
848 }
849 out
850 }
851
852 fn record_candidate(&mut self, candidate: &RoutingExplainCandidate) {
853 let skip_reasons = candidate
854 .skip_reasons
855 .iter()
856 .map(|reason| reason.code().to_string())
857 .collect::<Vec<_>>();
858 let compatibility = candidate.compatibility.as_ref().map(|compatibility| {
859 (
860 compatibility.station_name.clone(),
861 compatibility.upstream_index,
862 )
863 });
864 let view = RouteCandidateView {
865 provider_id: candidate.provider_id.clone(),
866 endpoint_id: candidate.endpoint_id.clone(),
867 base_url: candidate.upstream_base_url.clone(),
868 selected: candidate.selected,
869 skip_reasons: skip_reasons.clone(),
870 compatibility: compatibility.clone(),
871 };
872
873 self.endpoint_by_provider_base_url.insert(
874 (
875 candidate.provider_id.clone(),
876 candidate.upstream_base_url.clone(),
877 ),
878 candidate.endpoint_id.clone(),
879 );
880 if let Some((station_name, upstream_index)) = compatibility {
881 self.endpoint_by_provider_station_upstream.insert(
882 (candidate.provider_id.clone(), station_name, upstream_index),
883 candidate.endpoint_id.clone(),
884 );
885 }
886
887 let impact = self
888 .provider_impacts
889 .entry(candidate.provider_id.clone())
890 .or_insert_with(|| UsageBalanceRouteImpact {
891 provider_id: candidate.provider_id.clone(),
892 ..UsageBalanceRouteImpact::default()
893 });
894 impact.candidate_count += 1;
895 impact.route_paths.push(candidate.route_path.clone());
896 impact.skip_reasons.extend(skip_reasons);
897 if candidate.selected {
898 impact.selected = true;
899 impact.selected_endpoint_id = Some(candidate.endpoint_id.clone());
900 impact.selected_provider_endpoint_key = Some(candidate.provider_endpoint_key.clone());
901 }
902
903 self.candidates
904 .insert(candidate.provider_endpoint_key.clone(), view);
905 }
906}
907
908fn endpoint_id_from_base_url(base_url: &str) -> String {
909 let trimmed = base_url.trim();
910 let after_scheme = trimmed
911 .split_once("://")
912 .map(|(_, rest)| rest)
913 .unwrap_or(trimmed);
914 after_scheme
915 .split('/')
916 .next()
917 .map(str::trim)
918 .filter(|value| !value.is_empty())
919 .unwrap_or("endpoint")
920 .to_string()
921}
922
923fn non_empty(value: String) -> Option<String> {
924 (!value.trim().is_empty()).then_some(value)
925}
926
927#[cfg(test)]
928mod tests {
929 use super::*;
930 use crate::pricing::CostBreakdown;
931 use crate::routing_explain::{
932 RoutingExplainCandidate, RoutingExplainResponse, RoutingExplainSkipReason,
933 };
934 use crate::usage::UsageMetrics;
935
936 fn build_view(
937 usage_rollup: &UsageRollupView,
938 provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
939 recent: &[FinishedRequest],
940 routing_explain: Option<&RoutingExplainResponse>,
941 ) -> UsageBalanceView {
942 UsageBalanceView::build(UsageBalanceBuildInput {
943 service_name: "codex",
944 window_days: 7,
945 generated_at_ms: 1_000,
946 usage_rollup,
947 provider_balances,
948 recent,
949 routing_explain,
950 refresh: UsageBalanceRefreshInput::default(),
951 })
952 }
953
954 #[test]
955 fn balance_status_counts_keep_unknown_stale_exhausted_error_and_unlimited_distinct() {
956 let mut balances = HashMap::new();
957 balances.insert(
958 "station".to_string(),
959 vec![
960 ProviderBalanceSnapshot {
961 provider_id: "ok".to_string(),
962 exhausted: Some(false),
963 total_balance_usd: Some("3".to_string()),
964 fetched_at_ms: 900,
965 ..ProviderBalanceSnapshot::default()
966 },
967 ProviderBalanceSnapshot {
968 provider_id: "unlimited".to_string(),
969 exhausted: Some(false),
970 unlimited_quota: Some(true),
971 fetched_at_ms: 900,
972 ..ProviderBalanceSnapshot::default()
973 },
974 ProviderBalanceSnapshot {
975 provider_id: "stale".to_string(),
976 exhausted: Some(false),
977 stale_after_ms: Some(950),
978 fetched_at_ms: 900,
979 ..ProviderBalanceSnapshot::default()
980 },
981 ProviderBalanceSnapshot {
982 provider_id: "exhausted".to_string(),
983 exhausted: Some(true),
984 fetched_at_ms: 900,
985 ..ProviderBalanceSnapshot::default()
986 },
987 ProviderBalanceSnapshot {
988 provider_id: "error".to_string(),
989 error: Some("lookup failed".to_string()),
990 fetched_at_ms: 900,
991 ..ProviderBalanceSnapshot::default()
992 },
993 ProviderBalanceSnapshot {
994 provider_id: "unknown".to_string(),
995 fetched_at_ms: 900,
996 ..ProviderBalanceSnapshot::default()
997 },
998 ],
999 );
1000
1001 let view = build_view(&UsageRollupView::default(), &balances, &[], None);
1002
1003 assert_eq!(view.totals.balance_status_counts.ok, 1);
1004 assert_eq!(view.totals.balance_status_counts.unlimited, 1);
1005 assert_eq!(view.totals.balance_status_counts.stale, 1);
1006 assert_eq!(view.totals.balance_status_counts.exhausted, 1);
1007 assert_eq!(view.totals.balance_status_counts.error, 1);
1008 assert_eq!(view.totals.balance_status_counts.unknown, 1);
1009 assert_eq!(
1010 view.provider_rows
1011 .iter()
1012 .find(|row| row.provider_id == "unlimited")
1013 .map(|row| row.balance_status),
1014 Some(UsageBalanceStatus::Unlimited)
1015 );
1016 assert_eq!(
1017 view.provider_rows
1018 .iter()
1019 .find(|row| row.provider_id == "unknown")
1020 .map(|row| row.balance_status),
1021 Some(UsageBalanceStatus::Unknown)
1022 );
1023 }
1024
1025 #[test]
1026 fn provider_rows_prefer_route_selection_then_attention_and_usage() {
1027 let rollup = UsageRollupView {
1028 by_provider: vec![
1029 (
1030 "cheap".to_string(),
1031 UsageBucket {
1032 requests_total: 2,
1033 ..UsageBucket::default()
1034 },
1035 ),
1036 (
1037 "selected".to_string(),
1038 UsageBucket {
1039 requests_total: 1,
1040 ..UsageBucket::default()
1041 },
1042 ),
1043 ],
1044 ..UsageRollupView::default()
1045 };
1046 let explain = RoutingExplainResponse {
1047 api_version: 1,
1048 service_name: "codex".to_string(),
1049 runtime_loaded_at_ms: None,
1050 request_model: None,
1051 session_id: None,
1052 request_context: Default::default(),
1053 selected_route: None,
1054 candidates: vec![RoutingExplainCandidate {
1055 provider_id: "selected".to_string(),
1056 provider_alias: None,
1057 endpoint_id: "default".to_string(),
1058 provider_endpoint_key: "codex:selected:default".to_string(),
1059 route_path: vec!["main".to_string(), "selected".to_string()],
1060 preference_group: 0,
1061 compatibility: None,
1062 upstream_base_url: "https://selected.example/v1".to_string(),
1063 selected: true,
1064 skip_reasons: Vec::new(),
1065 }],
1066 affinity_policy: "off".to_string(),
1067 affinity: None,
1068 conditional_routes: Vec::new(),
1069 };
1070
1071 let view = build_view(&rollup, &HashMap::new(), &[], Some(&explain));
1072
1073 assert_eq!(view.provider_rows[0].provider_id, "selected");
1074 assert!(view.provider_rows[0].routing.selected);
1075 }
1076
1077 #[test]
1078 fn endpoint_rows_merge_recent_usage_balance_and_route_skip_reasons() {
1079 let recent = vec![FinishedRequest {
1080 id: 1,
1081 trace_id: None,
1082 session_id: None,
1083 client_name: None,
1084 client_addr: None,
1085 cwd: None,
1086 model: Some("gpt-test".to_string()),
1087 reasoning_effort: None,
1088 service_tier: None,
1089 station_name: Some("station-a".to_string()),
1090 provider_id: Some("right".to_string()),
1091 upstream_base_url: Some("https://right.example/v1".to_string()),
1092 route_decision: None,
1093 usage: Some(UsageMetrics {
1094 input_tokens: 10,
1095 output_tokens: 20,
1096 total_tokens: 30,
1097 ..UsageMetrics::default()
1098 }),
1099 cost: CostBreakdown::unknown(),
1100 retry: None,
1101 observability: Default::default(),
1102 service: "codex".to_string(),
1103 method: "POST".to_string(),
1104 path: "/v1/responses".to_string(),
1105 status_code: 429,
1106 duration_ms: 120,
1107 ttfb_ms: Some(20),
1108 streaming: false,
1109 ended_at_ms: 950,
1110 }];
1111 let mut balances = HashMap::new();
1112 balances.insert(
1113 "station-a".to_string(),
1114 vec![ProviderBalanceSnapshot {
1115 provider_id: "right".to_string(),
1116 station_name: Some("station-a".to_string()),
1117 upstream_index: Some(0),
1118 exhausted: Some(true),
1119 fetched_at_ms: 940,
1120 ..ProviderBalanceSnapshot::default()
1121 }],
1122 );
1123 let explain = RoutingExplainResponse {
1124 api_version: 1,
1125 service_name: "codex".to_string(),
1126 runtime_loaded_at_ms: None,
1127 request_model: None,
1128 session_id: None,
1129 request_context: Default::default(),
1130 selected_route: None,
1131 candidates: vec![RoutingExplainCandidate {
1132 provider_id: "right".to_string(),
1133 provider_alias: None,
1134 endpoint_id: "default".to_string(),
1135 provider_endpoint_key: "codex:right:default".to_string(),
1136 route_path: vec!["main".to_string(), "right".to_string()],
1137 preference_group: 0,
1138 compatibility: Some(crate::routing_explain::RoutingExplainCompatibility {
1139 station_name: "station-a".to_string(),
1140 upstream_index: 0,
1141 }),
1142 upstream_base_url: "https://right.example/v1".to_string(),
1143 selected: false,
1144 skip_reasons: vec![RoutingExplainSkipReason::UsageExhausted],
1145 }],
1146 affinity_policy: "off".to_string(),
1147 affinity: None,
1148 conditional_routes: Vec::new(),
1149 };
1150
1151 let view = build_view(
1152 &UsageRollupView::default(),
1153 &balances,
1154 &recent,
1155 Some(&explain),
1156 );
1157 let endpoint = view
1158 .endpoint_rows
1159 .iter()
1160 .find(|row| row.provider_id == "right")
1161 .expect("right endpoint");
1162
1163 assert_eq!(endpoint.endpoint_id, "default");
1164 assert_eq!(endpoint.usage.requests_total, 1);
1165 assert_eq!(endpoint.usage.requests_error, 1);
1166 assert_eq!(endpoint.balance_status, UsageBalanceStatus::Exhausted);
1167 assert_eq!(endpoint.route_skip_reasons, vec!["usage_exhausted"]);
1168 }
1169}