Skip to main content

codex_helper_core/
usage_balance.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use serde::{Deserialize, Serialize};
4
5use crate::pricing::{CostConfidence, UsdAmount};
6use crate::routing_explain::{RoutingExplainCandidate, RoutingExplainResponse};
7use crate::state::{
8    BalanceSnapshotStatus, FinishedRequest, ProviderBalanceSnapshot, UsageBucket, UsageRollupView,
9};
10use crate::usage_providers::UsageProviderRefreshSummary;
11
12#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, Hash)]
13#[serde(rename_all = "snake_case")]
14pub enum UsageBalanceStatus {
15    #[default]
16    Unknown,
17    Ok,
18    Unlimited,
19    Exhausted,
20    Stale,
21    Error,
22}
23
24impl UsageBalanceStatus {
25    pub fn as_str(self) -> &'static str {
26        match self {
27            UsageBalanceStatus::Unknown => "unknown",
28            UsageBalanceStatus::Ok => "ok",
29            UsageBalanceStatus::Unlimited => "unlimited",
30            UsageBalanceStatus::Exhausted => "exhausted",
31            UsageBalanceStatus::Stale => "stale",
32            UsageBalanceStatus::Error => "error",
33        }
34    }
35
36    pub fn is_attention(self) -> bool {
37        matches!(
38            self,
39            UsageBalanceStatus::Unknown
40                | UsageBalanceStatus::Exhausted
41                | UsageBalanceStatus::Stale
42                | UsageBalanceStatus::Error
43        )
44    }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
48pub struct UsageBalanceStatusCounts {
49    pub ok: usize,
50    pub unlimited: usize,
51    pub exhausted: usize,
52    pub stale: usize,
53    pub error: usize,
54    pub unknown: usize,
55}
56
57impl UsageBalanceStatusCounts {
58    pub fn total(&self) -> usize {
59        self.ok
60            .saturating_add(self.unlimited)
61            .saturating_add(self.exhausted)
62            .saturating_add(self.stale)
63            .saturating_add(self.error)
64            .saturating_add(self.unknown)
65    }
66
67    pub fn record(&mut self, status: UsageBalanceStatus) {
68        match status {
69            UsageBalanceStatus::Ok => self.ok += 1,
70            UsageBalanceStatus::Unlimited => self.unlimited += 1,
71            UsageBalanceStatus::Exhausted => self.exhausted += 1,
72            UsageBalanceStatus::Stale => self.stale += 1,
73            UsageBalanceStatus::Error => self.error += 1,
74            UsageBalanceStatus::Unknown => self.unknown += 1,
75        }
76    }
77
78    pub fn aggregate_status(&self) -> UsageBalanceStatus {
79        if self.total() == 0 {
80            UsageBalanceStatus::Unknown
81        } else if self.error > 0 {
82            UsageBalanceStatus::Error
83        } else if self.exhausted > 0 {
84            UsageBalanceStatus::Exhausted
85        } else if self.stale > 0 {
86            UsageBalanceStatus::Stale
87        } else if self.unknown > 0 {
88            UsageBalanceStatus::Unknown
89        } else if self.unlimited > 0 {
90            UsageBalanceStatus::Unlimited
91        } else {
92            UsageBalanceStatus::Ok
93        }
94    }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
98pub struct UsageBalanceView {
99    pub service_name: String,
100    pub window_days: usize,
101    pub generated_at_ms: u64,
102    pub totals: UsageBalanceTotals,
103    pub provider_rows: Vec<UsageBalanceProviderRow>,
104    pub endpoint_rows: Vec<UsageBalanceEndpointRow>,
105    pub routing_impacts: Vec<UsageBalanceRouteImpact>,
106    pub refresh_status: UsageBalanceRefreshStatus,
107    #[serde(default, skip_serializing_if = "Vec::is_empty")]
108    pub warnings: Vec<String>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
112pub struct UsageBalanceTotals {
113    pub requests_total: u64,
114    pub requests_error: u64,
115    pub success_per_mille: Option<u16>,
116    pub input_tokens: i64,
117    pub cached_input_tokens: i64,
118    pub output_tokens: i64,
119    pub reasoning_output_tokens: i64,
120    pub total_tokens: i64,
121    pub cost_total_usd: Option<String>,
122    pub cost_display: String,
123    pub cost_confidence: CostConfidence,
124    pub balance_status_counts: UsageBalanceStatusCounts,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
128pub struct UsageBalanceProviderRow {
129    pub provider_id: String,
130    pub display_name: String,
131    pub usage: UsageBucket,
132    pub success_per_mille: Option<u16>,
133    pub output_tokens_per_second: Option<u64>,
134    pub avg_ttfb_ms: Option<u64>,
135    pub cost_total_usd: Option<String>,
136    pub cost_display: String,
137    pub cost_confidence: CostConfidence,
138    pub balance_status: UsageBalanceStatus,
139    pub balance_counts: UsageBalanceStatusCounts,
140    pub primary_balance: Option<UsageBalanceSnapshotSummary>,
141    pub latest_balance_error: Option<String>,
142    pub balance_age_ms: Option<u64>,
143    pub routing: UsageBalanceRouteImpact,
144    pub endpoint_count: usize,
145    pub endpoints_with_balance: usize,
146    pub recent_endpoint_requests: u64,
147}
148
149impl UsageBalanceProviderRow {
150    pub fn needs_attention(&self) -> bool {
151        self.balance_status.is_attention()
152            || self
153                .latest_balance_error
154                .as_deref()
155                .is_some_and(|value| !value.trim().is_empty())
156            || self.usage.requests_error > 0
157    }
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
161pub struct UsageBalanceEndpointRow {
162    pub provider_id: String,
163    pub endpoint_id: String,
164    pub station_name: Option<String>,
165    pub upstream_index: Option<usize>,
166    pub base_url: Option<String>,
167    pub usage: UsageBucket,
168    pub balance_status: UsageBalanceStatus,
169    pub balance: Option<UsageBalanceSnapshotSummary>,
170    pub route_selected: bool,
171    pub route_skip_reasons: Vec<String>,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
175pub struct UsageBalanceSnapshotSummary {
176    pub provider_id: String,
177    pub station_name: Option<String>,
178    pub upstream_index: Option<usize>,
179    pub source: String,
180    pub status: UsageBalanceStatus,
181    pub amount_summary: String,
182    pub fetched_at_ms: u64,
183    pub stale_after_ms: Option<u64>,
184    pub age_ms: Option<u64>,
185    pub error: Option<String>,
186    pub exhaustion_affects_routing: bool,
187    pub routing_exhausted: bool,
188    pub routing_ignored_exhaustion: bool,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
192pub struct UsageBalanceRouteImpact {
193    pub provider_id: String,
194    pub selected: bool,
195    pub selected_endpoint_id: Option<String>,
196    pub selected_provider_endpoint_key: Option<String>,
197    pub candidate_count: usize,
198    pub skip_reasons: Vec<String>,
199    pub route_paths: Vec<Vec<String>>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
203pub struct UsageBalanceRefreshStatus {
204    pub refreshing: bool,
205    pub total_snapshots: usize,
206    pub latest_fetched_at_ms: Option<u64>,
207    pub latest_error: Option<String>,
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub latest_error_provider_id: Option<String>,
210    pub last_message: Option<String>,
211    pub last_error: Option<String>,
212    #[serde(default, skip_serializing_if = "Option::is_none")]
213    pub last_provider_refresh: Option<UsageProviderRefreshSummary>,
214    pub status_counts: UsageBalanceStatusCounts,
215}
216
217#[derive(Debug, Clone, Default)]
218pub struct UsageBalanceRefreshInput {
219    pub refreshing: bool,
220    pub last_message: Option<String>,
221    pub last_error: Option<String>,
222    pub last_provider_refresh: Option<UsageProviderRefreshSummary>,
223}
224
225pub struct UsageBalanceBuildInput<'a> {
226    pub service_name: &'a str,
227    pub window_days: usize,
228    pub generated_at_ms: u64,
229    pub usage_rollup: &'a UsageRollupView,
230    pub provider_balances: &'a HashMap<String, Vec<ProviderBalanceSnapshot>>,
231    pub recent: &'a [FinishedRequest],
232    pub routing_explain: Option<&'a RoutingExplainResponse>,
233    pub refresh: UsageBalanceRefreshInput,
234}
235
236impl UsageBalanceView {
237    pub fn build(input: UsageBalanceBuildInput<'_>) -> Self {
238        let route_index = RouteIndex::from_explain(input.routing_explain);
239        let mut endpoint_rows = build_endpoint_rows(
240            input.recent,
241            input.provider_balances,
242            &route_index,
243            input.generated_at_ms,
244        );
245        sort_endpoint_rows(&mut endpoint_rows);
246
247        let provider_rows = build_provider_rows(&input, &route_index, &endpoint_rows);
248        let totals = build_totals(
249            input.usage_rollup,
250            input.provider_balances,
251            input.generated_at_ms,
252        );
253        let refresh_status = build_refresh_status(
254            input.provider_balances,
255            input.generated_at_ms,
256            input.refresh,
257        );
258        let routing_impacts = route_index
259            .provider_impacts
260            .values()
261            .cloned()
262            .collect::<Vec<_>>();
263
264        Self {
265            service_name: input.service_name.to_string(),
266            window_days: input.window_days,
267            generated_at_ms: input.generated_at_ms,
268            totals,
269            provider_rows,
270            endpoint_rows,
271            routing_impacts,
272            refresh_status,
273            warnings: Vec::new(),
274        }
275    }
276}
277
278fn build_totals(
279    rollup: &UsageRollupView,
280    provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
281    now_ms: u64,
282) -> UsageBalanceTotals {
283    let window = &rollup.window;
284    let mut balance_status_counts = UsageBalanceStatusCounts::default();
285    for snapshot in provider_balances.values().flatten() {
286        balance_status_counts.record(classify_snapshot(snapshot, now_ms));
287    }
288
289    UsageBalanceTotals {
290        requests_total: window.requests_total,
291        requests_error: window.requests_error,
292        success_per_mille: per_mille(
293            window.requests_total.saturating_sub(window.requests_error),
294            window.requests_total,
295        ),
296        input_tokens: window.usage.input_tokens,
297        cached_input_tokens: window.usage.cache_read_tokens_total(),
298        output_tokens: window.usage.output_tokens,
299        reasoning_output_tokens: window.usage.reasoning_output_tokens_total(),
300        total_tokens: window.usage.total_tokens,
301        cost_total_usd: window.cost.total_cost_usd.clone(),
302        cost_display: window.cost.display_total(),
303        cost_confidence: window.cost.confidence,
304        balance_status_counts,
305    }
306}
307
308fn build_provider_rows(
309    input: &UsageBalanceBuildInput<'_>,
310    route_index: &RouteIndex,
311    endpoint_rows: &[UsageBalanceEndpointRow],
312) -> Vec<UsageBalanceProviderRow> {
313    let mut provider_ids = BTreeSet::new();
314    for (provider_id, _) in &input.usage_rollup.by_provider {
315        provider_ids.insert(provider_id.clone());
316    }
317    for (provider_id, _) in group_balances_by_provider(input.provider_balances) {
318        provider_ids.insert(provider_id);
319    }
320    for request in input.recent {
321        if let Some(provider_id) = request
322            .provider_id
323            .as_deref()
324            .map(str::trim)
325            .filter(|value| !value.is_empty())
326        {
327            provider_ids.insert(provider_id.to_string());
328        }
329    }
330    for provider_id in route_index.provider_impacts.keys() {
331        provider_ids.insert(provider_id.clone());
332    }
333
334    let usage_by_provider = input
335        .usage_rollup
336        .by_provider
337        .iter()
338        .cloned()
339        .collect::<BTreeMap<_, _>>();
340    let balances_by_provider = group_balances_by_provider(input.provider_balances);
341    let mut endpoint_counts = BTreeMap::<String, (usize, usize, u64)>::new();
342    for endpoint in endpoint_rows {
343        let entry = endpoint_counts
344            .entry(endpoint.provider_id.clone())
345            .or_insert((0, 0, 0));
346        entry.0 += 1;
347        if endpoint.balance.is_some() {
348            entry.1 += 1;
349        }
350        entry.2 = entry.2.saturating_add(endpoint.usage.requests_total);
351    }
352
353    let mut rows = provider_ids
354        .into_iter()
355        .map(|provider_id| {
356            let usage = usage_by_provider
357                .get(provider_id.as_str())
358                .cloned()
359                .unwrap_or_default();
360            let balances = balances_by_provider
361                .get(provider_id.as_str())
362                .cloned()
363                .unwrap_or_default();
364            let balance_counts = balance_counts_for_snapshots(&balances, input.generated_at_ms);
365            let primary_balance = primary_balance_snapshot(&balances, input.generated_at_ms)
366                .map(|snapshot| summarize_snapshot(snapshot, input.generated_at_ms));
367            let latest_balance_error = latest_balance_error(&balances);
368            let balance_age_ms = primary_balance.as_ref().and_then(|summary| summary.age_ms);
369            let balance_status = balance_counts.aggregate_status();
370            let routing = route_index
371                .provider_impacts
372                .get(provider_id.as_str())
373                .cloned()
374                .unwrap_or_else(|| UsageBalanceRouteImpact {
375                    provider_id: provider_id.clone(),
376                    ..UsageBalanceRouteImpact::default()
377                });
378            let (endpoint_count, endpoints_with_balance, recent_endpoint_requests) =
379                endpoint_counts
380                    .get(provider_id.as_str())
381                    .copied()
382                    .unwrap_or_default();
383
384            UsageBalanceProviderRow {
385                display_name: provider_id.clone(),
386                provider_id,
387                success_per_mille: per_mille(
388                    usage.requests_total.saturating_sub(usage.requests_error),
389                    usage.requests_total,
390                ),
391                output_tokens_per_second: output_tokens_per_second(&usage),
392                avg_ttfb_ms: (usage.ttfb_samples > 0)
393                    .then(|| usage.ttfb_ms_total / usage.ttfb_samples),
394                cost_total_usd: usage.cost.total_cost_usd.clone(),
395                cost_display: usage.cost.display_total(),
396                cost_confidence: usage.cost.confidence,
397                usage,
398                balance_status,
399                balance_counts,
400                primary_balance,
401                latest_balance_error,
402                balance_age_ms,
403                routing,
404                endpoint_count,
405                endpoints_with_balance,
406                recent_endpoint_requests,
407            }
408        })
409        .collect::<Vec<_>>();
410
411    sort_provider_rows(&mut rows);
412    rows
413}
414
415fn build_refresh_status(
416    provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
417    now_ms: u64,
418    input: UsageBalanceRefreshInput,
419) -> UsageBalanceRefreshStatus {
420    let mut out = UsageBalanceRefreshStatus {
421        refreshing: input.refreshing,
422        last_message: input.last_message,
423        last_error: input.last_error,
424        last_provider_refresh: input.last_provider_refresh,
425        ..UsageBalanceRefreshStatus::default()
426    };
427    let mut latest_error_fetched_at_ms: Option<u64> = None;
428
429    for snapshot in provider_balances.values().flatten() {
430        out.total_snapshots += 1;
431        out.latest_fetched_at_ms = Some(
432            out.latest_fetched_at_ms
433                .unwrap_or(0)
434                .max(snapshot.fetched_at_ms),
435        );
436        let status = classify_snapshot(snapshot, now_ms);
437        out.status_counts.record(status);
438        if let Some(error) = snapshot
439            .error
440            .as_deref()
441            .map(str::trim)
442            .filter(|v| !v.is_empty())
443            && latest_error_fetched_at_ms.is_none_or(|latest| snapshot.fetched_at_ms >= latest)
444        {
445            out.latest_error = Some(error.to_string());
446            out.latest_error_provider_id = non_empty(snapshot.provider_id.clone())
447                .or_else(|| non_empty(snapshot.station_name.clone().unwrap_or_default()));
448            latest_error_fetched_at_ms = Some(snapshot.fetched_at_ms);
449        }
450    }
451
452    out
453}
454
455fn build_endpoint_rows(
456    recent: &[FinishedRequest],
457    provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
458    route_index: &RouteIndex,
459    now_ms: u64,
460) -> Vec<UsageBalanceEndpointRow> {
461    let mut acc = BTreeMap::<EndpointKey, EndpointAccum>::new();
462
463    for candidate in route_index.candidates.values() {
464        let key = EndpointKey {
465            provider_id: candidate.provider_id.clone(),
466            endpoint_id: candidate.endpoint_id.clone(),
467        };
468        let row = acc
469            .entry(key.clone())
470            .or_insert_with(|| EndpointAccum::new(key));
471        row.base_url = non_empty(candidate.base_url.clone()).or(row.base_url.take());
472        row.route_selected |= candidate.selected;
473        row.route_skip_reasons
474            .extend(candidate.skip_reasons.iter().cloned());
475        if let Some((station_name, upstream_index)) = candidate.compatibility.as_ref() {
476            row.station_name = Some(station_name.clone());
477            row.upstream_index = Some(*upstream_index);
478        }
479    }
480
481    for request in recent {
482        let Some(provider_id) = request
483            .provider_id
484            .as_deref()
485            .map(str::trim)
486            .filter(|value| !value.is_empty())
487        else {
488            continue;
489        };
490        let base_url = request
491            .upstream_base_url
492            .as_deref()
493            .map(str::trim)
494            .filter(|value| !value.is_empty());
495        let endpoint_id = base_url
496            .and_then(|base_url| {
497                route_index
498                    .endpoint_by_provider_base_url
499                    .get(&(provider_id.to_string(), base_url.to_string()))
500                    .cloned()
501            })
502            .unwrap_or_else(|| {
503                base_url
504                    .map(endpoint_id_from_base_url)
505                    .unwrap_or_else(|| "unknown_endpoint".to_string())
506            });
507        let key = EndpointKey {
508            provider_id: provider_id.to_string(),
509            endpoint_id,
510        };
511        let row = acc
512            .entry(key.clone())
513            .or_insert_with(|| EndpointAccum::new(key));
514        if row.base_url.is_none() {
515            row.base_url = base_url.map(ToOwned::to_owned);
516        }
517        record_finished_into_bucket(&mut row.usage, request);
518    }
519
520    for (station_name, snapshots) in provider_balances {
521        for snapshot in snapshots {
522            let provider_id = balance_provider_id(station_name, snapshot);
523            let endpoint_id = snapshot
524                .upstream_index
525                .and_then(|idx| {
526                    route_index
527                        .endpoint_by_provider_station_upstream
528                        .get(&(provider_id.clone(), station_name.clone(), idx))
529                        .cloned()
530                })
531                .unwrap_or_else(|| {
532                    snapshot
533                        .upstream_index
534                        .map(|idx| format!("upstream#{idx}"))
535                        .or_else(|| non_empty(snapshot.source.clone()))
536                        .unwrap_or_else(|| "balance".to_string())
537                });
538            let key = EndpointKey {
539                provider_id: provider_id.clone(),
540                endpoint_id,
541            };
542            let row = acc
543                .entry(key.clone())
544                .or_insert_with(|| EndpointAccum::new(key));
545            row.station_name = snapshot
546                .station_name
547                .clone()
548                .or_else(|| Some(station_name.clone()))
549                .or(row.station_name.take());
550            row.upstream_index = snapshot.upstream_index.or(row.upstream_index);
551            let summary = summarize_snapshot(snapshot, now_ms);
552            let replace = row.balance.as_ref().is_none_or(|existing| {
553                snapshot_display_rank(summary.status) < snapshot_display_rank(existing.status)
554            });
555            if replace {
556                row.balance = Some(summary);
557            }
558        }
559    }
560
561    acc.into_values().map(EndpointAccum::finish).collect()
562}
563
564fn record_finished_into_bucket(bucket: &mut UsageBucket, request: &FinishedRequest) {
565    bucket.requests_total = bucket.requests_total.saturating_add(1);
566    if request.status_code >= 400 {
567        bucket.requests_error = bucket.requests_error.saturating_add(1);
568    }
569    bucket.duration_ms_total = bucket.duration_ms_total.saturating_add(request.duration_ms);
570
571    let Some(usage) = request.usage.as_ref() else {
572        return;
573    };
574
575    bucket.usage.add_assign(usage);
576    bucket.cost.record_usage_cost(&request.cost);
577    bucket.requests_with_usage = bucket.requests_with_usage.saturating_add(1);
578    bucket.duration_ms_with_usage_total = bucket
579        .duration_ms_with_usage_total
580        .saturating_add(request.duration_ms);
581    let generation_ms = match request.ttfb_ms {
582        Some(ttfb) if ttfb > 0 && ttfb < request.duration_ms => {
583            request.duration_ms.saturating_sub(ttfb)
584        }
585        _ => request.duration_ms,
586    };
587    bucket.generation_ms_total = bucket.generation_ms_total.saturating_add(generation_ms);
588    if let Some(ttfb) = request.ttfb_ms.filter(|value| *value > 0) {
589        bucket.ttfb_ms_total = bucket.ttfb_ms_total.saturating_add(ttfb);
590        bucket.ttfb_samples = bucket.ttfb_samples.saturating_add(1);
591    }
592}
593
594fn group_balances_by_provider(
595    provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
596) -> BTreeMap<String, Vec<&ProviderBalanceSnapshot>> {
597    let mut out = BTreeMap::<String, Vec<&ProviderBalanceSnapshot>>::new();
598    for (station_name, snapshots) in provider_balances {
599        for snapshot in snapshots {
600            out.entry(balance_provider_id(station_name, snapshot))
601                .or_default()
602                .push(snapshot);
603        }
604    }
605    out
606}
607
608fn balance_provider_id(station_name: &str, snapshot: &ProviderBalanceSnapshot) -> String {
609    if snapshot.provider_id.trim().is_empty() {
610        station_name.to_string()
611    } else {
612        snapshot.provider_id.trim().to_string()
613    }
614}
615
616fn balance_counts_for_snapshots(
617    snapshots: &[&ProviderBalanceSnapshot],
618    now_ms: u64,
619) -> UsageBalanceStatusCounts {
620    let mut counts = UsageBalanceStatusCounts::default();
621    for snapshot in snapshots {
622        counts.record(classify_snapshot(snapshot, now_ms));
623    }
624    counts
625}
626
627fn classify_snapshot(snapshot: &ProviderBalanceSnapshot, now_ms: u64) -> UsageBalanceStatus {
628    match snapshot.status_at(now_ms) {
629        BalanceSnapshotStatus::Ok if snapshot.unlimited_quota == Some(true) => {
630            UsageBalanceStatus::Unlimited
631        }
632        BalanceSnapshotStatus::Ok => UsageBalanceStatus::Ok,
633        BalanceSnapshotStatus::Exhausted => UsageBalanceStatus::Exhausted,
634        BalanceSnapshotStatus::Stale => UsageBalanceStatus::Stale,
635        BalanceSnapshotStatus::Error => UsageBalanceStatus::Error,
636        BalanceSnapshotStatus::Unknown => UsageBalanceStatus::Unknown,
637    }
638}
639
640fn summarize_snapshot(
641    snapshot: &ProviderBalanceSnapshot,
642    now_ms: u64,
643) -> UsageBalanceSnapshotSummary {
644    UsageBalanceSnapshotSummary {
645        provider_id: snapshot.provider_id.clone(),
646        station_name: snapshot.station_name.clone(),
647        upstream_index: snapshot.upstream_index,
648        source: snapshot.source.clone(),
649        status: classify_snapshot(snapshot, now_ms),
650        amount_summary: snapshot.amount_summary(),
651        fetched_at_ms: snapshot.fetched_at_ms,
652        stale_after_ms: snapshot.stale_after_ms,
653        age_ms: (now_ms >= snapshot.fetched_at_ms).then(|| now_ms - snapshot.fetched_at_ms),
654        error: snapshot.error.clone(),
655        exhaustion_affects_routing: snapshot.exhaustion_affects_routing,
656        routing_exhausted: snapshot.exhaustion_affects_routing
657            && snapshot.status_at(now_ms) == BalanceSnapshotStatus::Exhausted,
658        routing_ignored_exhaustion: !snapshot.exhaustion_affects_routing
659            && snapshot.status_at(now_ms) == BalanceSnapshotStatus::Exhausted,
660    }
661}
662
663fn primary_balance_snapshot<'a>(
664    snapshots: &'a [&ProviderBalanceSnapshot],
665    now_ms: u64,
666) -> Option<&'a ProviderBalanceSnapshot> {
667    snapshots.iter().copied().min_by(|left, right| {
668        snapshot_display_rank(classify_snapshot(left, now_ms))
669            .cmp(&snapshot_display_rank(classify_snapshot(right, now_ms)))
670            .then_with(|| left.upstream_index.cmp(&right.upstream_index))
671            .then_with(|| right.fetched_at_ms.cmp(&left.fetched_at_ms))
672    })
673}
674
675fn snapshot_display_rank(status: UsageBalanceStatus) -> u8 {
676    match status {
677        UsageBalanceStatus::Ok | UsageBalanceStatus::Unlimited => 0,
678        UsageBalanceStatus::Stale => 1,
679        UsageBalanceStatus::Unknown | UsageBalanceStatus::Error => 2,
680        UsageBalanceStatus::Exhausted => 3,
681    }
682}
683
684fn latest_balance_error(snapshots: &[&ProviderBalanceSnapshot]) -> Option<String> {
685    snapshots
686        .iter()
687        .filter_map(|snapshot| {
688            snapshot
689                .error
690                .as_deref()
691                .map(str::trim)
692                .filter(|value| !value.is_empty())
693                .map(|error| (snapshot.fetched_at_ms, error.to_string()))
694        })
695        .max_by_key(|(fetched_at_ms, _)| *fetched_at_ms)
696        .map(|(_, error)| error)
697}
698
699fn per_mille(num: u64, den: u64) -> Option<u16> {
700    (den > 0).then(|| ((num.saturating_mul(1000) / den).min(1000)) as u16)
701}
702
703fn output_tokens_per_second(bucket: &UsageBucket) -> Option<u64> {
704    let output = bucket.usage.output_tokens.max(0) as u64;
705    if output == 0 || bucket.generation_ms_total == 0 {
706        return None;
707    }
708    Some(output.saturating_mul(1000) / bucket.generation_ms_total)
709}
710
711fn sort_provider_rows(rows: &mut [UsageBalanceProviderRow]) {
712    rows.sort_by(|left, right| {
713        provider_selected_rank(right)
714            .cmp(&provider_selected_rank(left))
715            .then_with(|| {
716                provider_attention_rank(left.balance_status)
717                    .cmp(&provider_attention_rank(right.balance_status))
718            })
719            .then_with(|| provider_cost_sort_value(right).cmp(&provider_cost_sort_value(left)))
720            .then_with(|| right.usage.requests_total.cmp(&left.usage.requests_total))
721            .then_with(|| left.provider_id.cmp(&right.provider_id))
722    });
723}
724
725fn provider_selected_rank(row: &UsageBalanceProviderRow) -> u8 {
726    u8::from(row.routing.selected)
727}
728
729fn provider_attention_rank(status: UsageBalanceStatus) -> u8 {
730    match status {
731        UsageBalanceStatus::Error => 0,
732        UsageBalanceStatus::Exhausted => 1,
733        UsageBalanceStatus::Stale => 2,
734        UsageBalanceStatus::Unknown => 3,
735        UsageBalanceStatus::Unlimited => 4,
736        UsageBalanceStatus::Ok => 5,
737    }
738}
739
740fn provider_cost_sort_value(row: &UsageBalanceProviderRow) -> i128 {
741    row.cost_total_usd
742        .as_deref()
743        .and_then(UsdAmount::from_decimal_str)
744        .map(UsdAmount::femto_usd)
745        .unwrap_or(0)
746}
747
748fn sort_endpoint_rows(rows: &mut [UsageBalanceEndpointRow]) {
749    rows.sort_by(|left, right| {
750        u8::from(right.route_selected)
751            .cmp(&u8::from(left.route_selected))
752            .then_with(|| {
753                provider_attention_rank(left.balance_status)
754                    .cmp(&provider_attention_rank(right.balance_status))
755            })
756            .then_with(|| right.usage.requests_total.cmp(&left.usage.requests_total))
757            .then_with(|| left.provider_id.cmp(&right.provider_id))
758            .then_with(|| left.endpoint_id.cmp(&right.endpoint_id))
759    });
760}
761
762#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
763struct EndpointKey {
764    provider_id: String,
765    endpoint_id: String,
766}
767
768#[derive(Debug, Clone)]
769struct EndpointAccum {
770    key: EndpointKey,
771    station_name: Option<String>,
772    upstream_index: Option<usize>,
773    base_url: Option<String>,
774    usage: UsageBucket,
775    balance: Option<UsageBalanceSnapshotSummary>,
776    route_selected: bool,
777    route_skip_reasons: BTreeSet<String>,
778}
779
780impl EndpointAccum {
781    fn new(key: EndpointKey) -> Self {
782        Self {
783            key,
784            station_name: None,
785            upstream_index: None,
786            base_url: None,
787            usage: UsageBucket::default(),
788            balance: None,
789            route_selected: false,
790            route_skip_reasons: BTreeSet::new(),
791        }
792    }
793
794    fn finish(self) -> UsageBalanceEndpointRow {
795        let balance_status = self
796            .balance
797            .as_ref()
798            .map(|balance| balance.status)
799            .unwrap_or(UsageBalanceStatus::Unknown);
800        UsageBalanceEndpointRow {
801            provider_id: self.key.provider_id,
802            endpoint_id: self.key.endpoint_id,
803            station_name: self.station_name,
804            upstream_index: self.upstream_index,
805            base_url: self.base_url,
806            usage: self.usage,
807            balance_status,
808            balance: self.balance,
809            route_selected: self.route_selected,
810            route_skip_reasons: self.route_skip_reasons.into_iter().collect(),
811        }
812    }
813}
814
815#[derive(Debug, Clone)]
816struct RouteCandidateView {
817    provider_id: String,
818    endpoint_id: String,
819    base_url: String,
820    selected: bool,
821    skip_reasons: Vec<String>,
822    compatibility: Option<(String, usize)>,
823}
824
825#[derive(Debug, Clone, Default)]
826struct RouteIndex {
827    candidates: BTreeMap<String, RouteCandidateView>,
828    provider_impacts: BTreeMap<String, UsageBalanceRouteImpact>,
829    endpoint_by_provider_base_url: BTreeMap<(String, String), String>,
830    endpoint_by_provider_station_upstream: BTreeMap<(String, String, usize), String>,
831}
832
833impl RouteIndex {
834    fn from_explain(explain: Option<&RoutingExplainResponse>) -> Self {
835        let mut out = Self::default();
836        let Some(explain) = explain else {
837            return out;
838        };
839
840        for candidate in &explain.candidates {
841            out.record_candidate(candidate);
842        }
843        for impact in out.provider_impacts.values_mut() {
844            impact.skip_reasons.sort();
845            impact.skip_reasons.dedup();
846            impact.route_paths.sort();
847            impact.route_paths.dedup();
848        }
849        out
850    }
851
852    fn record_candidate(&mut self, candidate: &RoutingExplainCandidate) {
853        let skip_reasons = candidate
854            .skip_reasons
855            .iter()
856            .map(|reason| reason.code().to_string())
857            .collect::<Vec<_>>();
858        let compatibility = candidate.compatibility.as_ref().map(|compatibility| {
859            (
860                compatibility.station_name.clone(),
861                compatibility.upstream_index,
862            )
863        });
864        let view = RouteCandidateView {
865            provider_id: candidate.provider_id.clone(),
866            endpoint_id: candidate.endpoint_id.clone(),
867            base_url: candidate.upstream_base_url.clone(),
868            selected: candidate.selected,
869            skip_reasons: skip_reasons.clone(),
870            compatibility: compatibility.clone(),
871        };
872
873        self.endpoint_by_provider_base_url.insert(
874            (
875                candidate.provider_id.clone(),
876                candidate.upstream_base_url.clone(),
877            ),
878            candidate.endpoint_id.clone(),
879        );
880        if let Some((station_name, upstream_index)) = compatibility {
881            self.endpoint_by_provider_station_upstream.insert(
882                (candidate.provider_id.clone(), station_name, upstream_index),
883                candidate.endpoint_id.clone(),
884            );
885        }
886
887        let impact = self
888            .provider_impacts
889            .entry(candidate.provider_id.clone())
890            .or_insert_with(|| UsageBalanceRouteImpact {
891                provider_id: candidate.provider_id.clone(),
892                ..UsageBalanceRouteImpact::default()
893            });
894        impact.candidate_count += 1;
895        impact.route_paths.push(candidate.route_path.clone());
896        impact.skip_reasons.extend(skip_reasons);
897        if candidate.selected {
898            impact.selected = true;
899            impact.selected_endpoint_id = Some(candidate.endpoint_id.clone());
900            impact.selected_provider_endpoint_key = Some(candidate.provider_endpoint_key.clone());
901        }
902
903        self.candidates
904            .insert(candidate.provider_endpoint_key.clone(), view);
905    }
906}
907
908fn endpoint_id_from_base_url(base_url: &str) -> String {
909    let trimmed = base_url.trim();
910    let after_scheme = trimmed
911        .split_once("://")
912        .map(|(_, rest)| rest)
913        .unwrap_or(trimmed);
914    after_scheme
915        .split('/')
916        .next()
917        .map(str::trim)
918        .filter(|value| !value.is_empty())
919        .unwrap_or("endpoint")
920        .to_string()
921}
922
923fn non_empty(value: String) -> Option<String> {
924    (!value.trim().is_empty()).then_some(value)
925}
926
927#[cfg(test)]
928mod tests {
929    use super::*;
930    use crate::pricing::CostBreakdown;
931    use crate::routing_explain::{
932        RoutingExplainCandidate, RoutingExplainResponse, RoutingExplainSkipReason,
933    };
934    use crate::usage::UsageMetrics;
935
936    fn build_view(
937        usage_rollup: &UsageRollupView,
938        provider_balances: &HashMap<String, Vec<ProviderBalanceSnapshot>>,
939        recent: &[FinishedRequest],
940        routing_explain: Option<&RoutingExplainResponse>,
941    ) -> UsageBalanceView {
942        UsageBalanceView::build(UsageBalanceBuildInput {
943            service_name: "codex",
944            window_days: 7,
945            generated_at_ms: 1_000,
946            usage_rollup,
947            provider_balances,
948            recent,
949            routing_explain,
950            refresh: UsageBalanceRefreshInput::default(),
951        })
952    }
953
954    #[test]
955    fn balance_status_counts_keep_unknown_stale_exhausted_error_and_unlimited_distinct() {
956        let mut balances = HashMap::new();
957        balances.insert(
958            "station".to_string(),
959            vec![
960                ProviderBalanceSnapshot {
961                    provider_id: "ok".to_string(),
962                    exhausted: Some(false),
963                    total_balance_usd: Some("3".to_string()),
964                    fetched_at_ms: 900,
965                    ..ProviderBalanceSnapshot::default()
966                },
967                ProviderBalanceSnapshot {
968                    provider_id: "unlimited".to_string(),
969                    exhausted: Some(false),
970                    unlimited_quota: Some(true),
971                    fetched_at_ms: 900,
972                    ..ProviderBalanceSnapshot::default()
973                },
974                ProviderBalanceSnapshot {
975                    provider_id: "stale".to_string(),
976                    exhausted: Some(false),
977                    stale_after_ms: Some(950),
978                    fetched_at_ms: 900,
979                    ..ProviderBalanceSnapshot::default()
980                },
981                ProviderBalanceSnapshot {
982                    provider_id: "exhausted".to_string(),
983                    exhausted: Some(true),
984                    fetched_at_ms: 900,
985                    ..ProviderBalanceSnapshot::default()
986                },
987                ProviderBalanceSnapshot {
988                    provider_id: "error".to_string(),
989                    error: Some("lookup failed".to_string()),
990                    fetched_at_ms: 900,
991                    ..ProviderBalanceSnapshot::default()
992                },
993                ProviderBalanceSnapshot {
994                    provider_id: "unknown".to_string(),
995                    fetched_at_ms: 900,
996                    ..ProviderBalanceSnapshot::default()
997                },
998            ],
999        );
1000
1001        let view = build_view(&UsageRollupView::default(), &balances, &[], None);
1002
1003        assert_eq!(view.totals.balance_status_counts.ok, 1);
1004        assert_eq!(view.totals.balance_status_counts.unlimited, 1);
1005        assert_eq!(view.totals.balance_status_counts.stale, 1);
1006        assert_eq!(view.totals.balance_status_counts.exhausted, 1);
1007        assert_eq!(view.totals.balance_status_counts.error, 1);
1008        assert_eq!(view.totals.balance_status_counts.unknown, 1);
1009        assert_eq!(
1010            view.provider_rows
1011                .iter()
1012                .find(|row| row.provider_id == "unlimited")
1013                .map(|row| row.balance_status),
1014            Some(UsageBalanceStatus::Unlimited)
1015        );
1016        assert_eq!(
1017            view.provider_rows
1018                .iter()
1019                .find(|row| row.provider_id == "unknown")
1020                .map(|row| row.balance_status),
1021            Some(UsageBalanceStatus::Unknown)
1022        );
1023    }
1024
1025    #[test]
1026    fn provider_rows_prefer_route_selection_then_attention_and_usage() {
1027        let rollup = UsageRollupView {
1028            by_provider: vec![
1029                (
1030                    "cheap".to_string(),
1031                    UsageBucket {
1032                        requests_total: 2,
1033                        ..UsageBucket::default()
1034                    },
1035                ),
1036                (
1037                    "selected".to_string(),
1038                    UsageBucket {
1039                        requests_total: 1,
1040                        ..UsageBucket::default()
1041                    },
1042                ),
1043            ],
1044            ..UsageRollupView::default()
1045        };
1046        let explain = RoutingExplainResponse {
1047            api_version: 1,
1048            service_name: "codex".to_string(),
1049            runtime_loaded_at_ms: None,
1050            request_model: None,
1051            session_id: None,
1052            request_context: Default::default(),
1053            selected_route: None,
1054            candidates: vec![RoutingExplainCandidate {
1055                provider_id: "selected".to_string(),
1056                provider_alias: None,
1057                endpoint_id: "default".to_string(),
1058                provider_endpoint_key: "codex:selected:default".to_string(),
1059                route_path: vec!["main".to_string(), "selected".to_string()],
1060                preference_group: 0,
1061                compatibility: None,
1062                upstream_base_url: "https://selected.example/v1".to_string(),
1063                selected: true,
1064                skip_reasons: Vec::new(),
1065            }],
1066            affinity_policy: "off".to_string(),
1067            affinity: None,
1068            conditional_routes: Vec::new(),
1069        };
1070
1071        let view = build_view(&rollup, &HashMap::new(), &[], Some(&explain));
1072
1073        assert_eq!(view.provider_rows[0].provider_id, "selected");
1074        assert!(view.provider_rows[0].routing.selected);
1075    }
1076
1077    #[test]
1078    fn endpoint_rows_merge_recent_usage_balance_and_route_skip_reasons() {
1079        let recent = vec![FinishedRequest {
1080            id: 1,
1081            trace_id: None,
1082            session_id: None,
1083            client_name: None,
1084            client_addr: None,
1085            cwd: None,
1086            model: Some("gpt-test".to_string()),
1087            reasoning_effort: None,
1088            service_tier: None,
1089            station_name: Some("station-a".to_string()),
1090            provider_id: Some("right".to_string()),
1091            upstream_base_url: Some("https://right.example/v1".to_string()),
1092            route_decision: None,
1093            usage: Some(UsageMetrics {
1094                input_tokens: 10,
1095                output_tokens: 20,
1096                total_tokens: 30,
1097                ..UsageMetrics::default()
1098            }),
1099            cost: CostBreakdown::unknown(),
1100            retry: None,
1101            observability: Default::default(),
1102            service: "codex".to_string(),
1103            method: "POST".to_string(),
1104            path: "/v1/responses".to_string(),
1105            status_code: 429,
1106            duration_ms: 120,
1107            ttfb_ms: Some(20),
1108            streaming: false,
1109            ended_at_ms: 950,
1110        }];
1111        let mut balances = HashMap::new();
1112        balances.insert(
1113            "station-a".to_string(),
1114            vec![ProviderBalanceSnapshot {
1115                provider_id: "right".to_string(),
1116                station_name: Some("station-a".to_string()),
1117                upstream_index: Some(0),
1118                exhausted: Some(true),
1119                fetched_at_ms: 940,
1120                ..ProviderBalanceSnapshot::default()
1121            }],
1122        );
1123        let explain = RoutingExplainResponse {
1124            api_version: 1,
1125            service_name: "codex".to_string(),
1126            runtime_loaded_at_ms: None,
1127            request_model: None,
1128            session_id: None,
1129            request_context: Default::default(),
1130            selected_route: None,
1131            candidates: vec![RoutingExplainCandidate {
1132                provider_id: "right".to_string(),
1133                provider_alias: None,
1134                endpoint_id: "default".to_string(),
1135                provider_endpoint_key: "codex:right:default".to_string(),
1136                route_path: vec!["main".to_string(), "right".to_string()],
1137                preference_group: 0,
1138                compatibility: Some(crate::routing_explain::RoutingExplainCompatibility {
1139                    station_name: "station-a".to_string(),
1140                    upstream_index: 0,
1141                }),
1142                upstream_base_url: "https://right.example/v1".to_string(),
1143                selected: false,
1144                skip_reasons: vec![RoutingExplainSkipReason::UsageExhausted],
1145            }],
1146            affinity_policy: "off".to_string(),
1147            affinity: None,
1148            conditional_routes: Vec::new(),
1149        };
1150
1151        let view = build_view(
1152            &UsageRollupView::default(),
1153            &balances,
1154            &recent,
1155            Some(&explain),
1156        );
1157        let endpoint = view
1158            .endpoint_rows
1159            .iter()
1160            .find(|row| row.provider_id == "right")
1161            .expect("right endpoint");
1162
1163        assert_eq!(endpoint.endpoint_id, "default");
1164        assert_eq!(endpoint.usage.requests_total, 1);
1165        assert_eq!(endpoint.usage.requests_error, 1);
1166        assert_eq!(endpoint.balance_status, UsageBalanceStatus::Exhausted);
1167        assert_eq!(endpoint.route_skip_reasons, vec!["usage_exhausted"]);
1168    }
1169}