Skip to main content

codex_helper_core/
request_ledger.rs

1use std::collections::HashMap;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4use std::path::Path;
5
6use serde_json::Value as JsonValue;
7
8pub use crate::logging::request_log_path;
9use crate::pricing::{CostAdjustments, estimate_request_cost_from_operator_catalog_for_service};
10use crate::state::{FinishedRequest, RequestObservability, RouteDecisionProvenance};
11use crate::usage::{CacheInputAccounting, UsageMetrics};
12
13#[derive(Debug, Clone, PartialEq)]
14pub struct RequestLogLine {
15    raw: String,
16    value: Option<JsonValue>,
17}
18
19impl RequestLogLine {
20    pub fn from_raw(raw: impl Into<String>) -> Self {
21        let raw = raw.into();
22        let value = serde_json::from_str::<JsonValue>(&raw).ok();
23        Self { raw, value }
24    }
25
26    pub fn raw(&self) -> &str {
27        &self.raw
28    }
29
30    pub fn value(&self) -> Option<&JsonValue> {
31        self.value.as_ref()
32    }
33
34    pub fn is_valid_json(&self) -> bool {
35        self.value.is_some()
36    }
37
38    pub fn display_lines(&self) -> Vec<String> {
39        self.value
40            .as_ref()
41            .map(format_request_log_record_lines)
42            .unwrap_or_default()
43    }
44}
45
46#[derive(Debug, Default, Clone, PartialEq, Eq)]
47pub struct RequestLogFilters {
48    pub session: Option<String>,
49    pub model: Option<String>,
50    pub station: Option<String>,
51    pub provider: Option<String>,
52    pub status_min: Option<u64>,
53    pub status_max: Option<u64>,
54    pub fast: bool,
55    pub retried: bool,
56}
57
58impl RequestLogFilters {
59    pub fn is_empty(&self) -> bool {
60        self.session.is_none()
61            && self.model.is_none()
62            && self.station.is_none()
63            && self.provider.is_none()
64            && self.status_min.is_none()
65            && self.status_max.is_none()
66            && !self.fast
67            && !self.retried
68    }
69
70    pub fn matches(&self, record: &JsonValue) -> bool {
71        if let Some(expected) = self.session.as_deref()
72            && !field_contains(str_field(record, "session_id"), expected)
73        {
74            return false;
75        }
76        if let Some(expected) = self.model.as_deref()
77            && !field_contains(request_model(record).as_deref(), expected)
78        {
79            return false;
80        }
81        if let Some(expected) = self.station.as_deref()
82            && !field_contains(Some(station_name(record)), expected)
83        {
84            return false;
85        }
86        if let Some(expected) = self.provider.as_deref()
87            && !field_contains(str_field(record, "provider_id"), expected)
88        {
89            return false;
90        }
91        if let Some(min) = self.status_min
92            && u64_field(record, "status_code").unwrap_or(0) < min
93        {
94            return false;
95        }
96        if let Some(max) = self.status_max
97            && u64_field(record, "status_code").unwrap_or(0) > max
98        {
99            return false;
100        }
101        if self.fast && !request_is_fast(record) {
102            return false;
103        }
104        if self.retried && !request_was_retried(record) {
105            return false;
106        }
107        true
108    }
109}
110
111#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
112pub struct RequestUsageAggregate {
113    pub requests: u64,
114    pub duration_ms_total: u64,
115    pub input_tokens: i64,
116    pub output_tokens: i64,
117    pub reasoning_tokens: i64,
118    pub cache_read_input_tokens: i64,
119    pub cache_creation_input_tokens: i64,
120    pub total_tokens: i64,
121}
122
123impl RequestUsageAggregate {
124    pub fn record(
125        &mut self,
126        duration_ms: u64,
127        usage: Option<&UsageMetrics>,
128        accounting: CacheInputAccounting,
129    ) {
130        self.requests = self.requests.saturating_add(1);
131        self.duration_ms_total = self.duration_ms_total.saturating_add(duration_ms);
132        let Some(usage) = usage else {
133            return;
134        };
135
136        self.input_tokens = self.input_tokens.saturating_add(usage.input_tokens.max(0));
137        self.output_tokens = self
138            .output_tokens
139            .saturating_add(usage.output_tokens.max(0));
140        self.reasoning_tokens = self
141            .reasoning_tokens
142            .saturating_add(reasoning_tokens(usage));
143        self.cache_read_input_tokens = self
144            .cache_read_input_tokens
145            .saturating_add(usage.cache_read_tokens_total());
146        self.cache_creation_input_tokens = self
147            .cache_creation_input_tokens
148            .saturating_add(cache_creation_tokens(usage));
149        self.total_tokens = self
150            .total_tokens
151            .saturating_add(total_tokens(usage, accounting));
152    }
153
154    pub fn average_duration_ms(&self) -> u64 {
155        self.duration_ms_total
156            .checked_div(self.requests)
157            .unwrap_or(0)
158    }
159
160    pub fn summary_line(&self, station_name: &str) -> String {
161        format!(
162            "{} | {} | {} | {} | {} | {} | {} | {} | {}",
163            station_name,
164            self.requests,
165            self.input_tokens,
166            self.output_tokens,
167            self.cache_read_input_tokens,
168            self.cache_creation_input_tokens,
169            self.reasoning_tokens,
170            self.total_tokens,
171            self.average_duration_ms()
172        )
173    }
174}
175
176#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default)]
177#[serde(rename_all = "snake_case")]
178pub enum RequestUsageSummaryGroup {
179    #[default]
180    Station,
181    Provider,
182    Model,
183    Session,
184}
185
186impl RequestUsageSummaryGroup {
187    pub fn column_name(self) -> &'static str {
188        match self {
189            Self::Station => "station_name",
190            Self::Provider => "provider_id",
191            Self::Model => "model",
192            Self::Session => "session_id",
193        }
194    }
195
196    fn key(self, record: &JsonValue) -> String {
197        match self {
198            Self::Station => station_name(record).to_string(),
199            Self::Provider => str_field(record, "provider_id").unwrap_or("-").to_string(),
200            Self::Model => request_model(record).unwrap_or_else(|| "-".to_string()),
201            Self::Session => str_field(record, "session_id").unwrap_or("-").to_string(),
202        }
203    }
204}
205
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
207pub struct RequestUsageSummaryRow {
208    pub group_value: String,
209    pub aggregate: RequestUsageAggregate,
210}
211
212pub fn read_request_log_lines(path: &Path) -> std::io::Result<Vec<RequestLogLine>> {
213    let file = File::open(path)?;
214    let reader = BufReader::new(file);
215    Ok(reader
216        .lines()
217        .map_while(Result::ok)
218        .map(RequestLogLine::from_raw)
219        .collect())
220}
221
222pub fn tail_request_log(path: &Path, limit: usize) -> std::io::Result<Vec<RequestLogLine>> {
223    let lines = read_request_log_lines(path)?;
224    let total = lines.len();
225    let start = total.saturating_sub(limit);
226    Ok(lines[start..].to_vec())
227}
228
229pub fn tail_finished_requests_from_log(
230    path: &Path,
231    limit: usize,
232) -> std::io::Result<Vec<FinishedRequest>> {
233    let lines = tail_request_log(path, limit)?;
234    let mut requests = lines
235        .iter()
236        .filter_map(|line| {
237            line.value()
238                .and_then(finished_request_from_request_log_record)
239        })
240        .collect::<Vec<_>>();
241    requests.reverse();
242    Ok(requests)
243}
244
245pub fn find_finished_requests_from_log(
246    path: &Path,
247    filters: &RequestLogFilters,
248    limit: usize,
249) -> std::io::Result<Vec<FinishedRequest>> {
250    let lines = find_request_log(path, filters, limit)?;
251    Ok(lines
252        .iter()
253        .filter_map(|line| {
254            line.value()
255                .and_then(finished_request_from_request_log_record)
256        })
257        .collect())
258}
259
260pub fn summarize_request_log(
261    path: &Path,
262    group: RequestUsageSummaryGroup,
263    filters: &RequestLogFilters,
264    limit: usize,
265) -> std::io::Result<Vec<RequestUsageSummaryRow>> {
266    let lines = read_request_log_lines(path)?;
267    Ok(summarize_request_log_lines(
268        lines.iter(),
269        group,
270        filters,
271        limit,
272    ))
273}
274
275pub fn find_request_log(
276    path: &Path,
277    filters: &RequestLogFilters,
278    limit: usize,
279) -> std::io::Result<Vec<RequestLogLine>> {
280    let lines = read_request_log_lines(path)?;
281    Ok(lines
282        .iter()
283        .rev()
284        .filter(|line| line.value().is_some_and(|record| filters.matches(record)))
285        .take(limit)
286        .cloned()
287        .collect())
288}
289
290fn summarize_request_log_lines<'a>(
291    lines: impl IntoIterator<Item = &'a RequestLogLine>,
292    group: RequestUsageSummaryGroup,
293    filters: &RequestLogFilters,
294    limit: usize,
295) -> Vec<RequestUsageSummaryRow> {
296    let mut aggregate: HashMap<String, RequestUsageAggregate> = HashMap::new();
297    for line in lines {
298        let Some(record) = line.value() else {
299            continue;
300        };
301        if !filters.matches(record) {
302            continue;
303        }
304        let group_value = group.key(record);
305        let duration_ms = u64_field(record, "duration_ms").unwrap_or(0);
306        let service = str_field(record, "service").unwrap_or("-");
307        let usage = usage_metrics(record);
308        let entry = aggregate.entry(group_value).or_default();
309        entry.record(
310            duration_ms,
311            usage.as_ref(),
312            CacheInputAccounting::for_service(service),
313        );
314    }
315
316    let mut items: Vec<RequestUsageSummaryRow> = aggregate
317        .into_iter()
318        .map(|(group_value, aggregate)| RequestUsageSummaryRow {
319            group_value,
320            aggregate,
321        })
322        .collect();
323    items.sort_by(|a, b| {
324        b.aggregate
325            .total_tokens
326            .cmp(&a.aggregate.total_tokens)
327            .then_with(|| a.group_value.cmp(&b.group_value))
328    });
329    items.into_iter().take(limit).collect()
330}
331
332pub fn format_request_log_record_lines(record: &JsonValue) -> Vec<String> {
333    let ts = i64_field(record, "timestamp_ms").unwrap_or(0);
334    let service = str_field(record, "service").unwrap_or("-");
335    let method = str_field(record, "method").unwrap_or("-");
336    let path = str_field(record, "path").unwrap_or("-");
337    let status = u64_field(record, "status_code").unwrap_or(0);
338    let provider = str_field(record, "provider_id").unwrap_or("-");
339    let endpoint = provider_endpoint_display(record);
340    let station = station_name(record);
341    let model = request_model(record).unwrap_or_else(|| "-".to_string());
342    let tier = service_tier_display(record);
343
344    let mut lines = vec![format!(
345        "[{}] {} {} {} status={} endpoint={} provider={} station={} model={} tier={}",
346        ts, service, method, path, status, endpoint, provider, station, model, tier
347    )];
348
349    let duration_ms = u64_field(record, "duration_ms").unwrap_or(0);
350    let ttfb_ms = u64_field(record, "ttfb_ms");
351    let usage = usage_metrics(record);
352    let speed = usage
353        .as_ref()
354        .and_then(|usage| output_tokens_per_second(usage, duration_ms, ttfb_ms));
355    let cost = request_cost_display(service, model.as_str(), usage.as_ref());
356
357    lines.push(format!(
358        "    timing duration={} ttfb={} output_speed={} cost={}",
359        format_ms(duration_ms),
360        format_optional_ms(ttfb_ms),
361        format_optional_speed(speed),
362        cost
363    ));
364
365    if let Some(usage) = usage.as_ref() {
366        lines.push(format!(
367            "    tokens input={} output={} cache_read={} cache_create={} reasoning={} total={}",
368            usage.input_tokens.max(0),
369            usage.output_tokens.max(0),
370            usage.cache_read_tokens_total(),
371            cache_creation_tokens(usage),
372            reasoning_tokens(usage),
373            total_tokens(usage, CacheInputAccounting::for_service(service))
374        ));
375    } else {
376        lines.push("    tokens -".to_string());
377    }
378
379    lines
380}
381
382pub fn request_log_record_model(record: &JsonValue) -> Option<String> {
383    request_model(record)
384}
385
386pub fn request_log_record_station(record: &JsonValue) -> &str {
387    station_name(record)
388}
389
390pub fn request_log_record_is_fast(record: &JsonValue) -> bool {
391    request_is_fast(record)
392}
393
394pub fn request_log_record_was_retried(record: &JsonValue) -> bool {
395    request_was_retried(record)
396}
397
398pub fn finished_request_from_request_log_record(record: &JsonValue) -> Option<FinishedRequest> {
399    let timestamp_ms = u64_field(record, "timestamp_ms").unwrap_or(0);
400    let request_id = u64_field(record, "request_id").unwrap_or(timestamp_ms);
401    let status_code = u64_field(record, "status_code")
402        .and_then(|status| u16::try_from(status).ok())
403        .unwrap_or(0);
404    let duration_ms = u64_field(record, "duration_ms").unwrap_or(0);
405    let usage = usage_metrics(record);
406    let model = request_model(record);
407    let service = str_field(record, "service").unwrap_or("-");
408    let cost = estimate_request_cost_from_operator_catalog_for_service(
409        model.as_deref(),
410        usage.as_ref(),
411        CostAdjustments::default(),
412        service,
413    );
414    let retry = record
415        .get("retry")
416        .and_then(|retry| serde_json::from_value(retry.clone()).ok());
417    let route_decision = record.get("route_decision").and_then(|route_decision| {
418        serde_json::from_value::<RouteDecisionProvenance>(route_decision.clone()).ok()
419    });
420
421    let mut request = FinishedRequest {
422        id: request_id,
423        trace_id: str_field(record, "trace_id").map(ToOwned::to_owned),
424        session_id: str_field(record, "session_id").map(ToOwned::to_owned),
425        client_name: str_field(record, "client_name").map(ToOwned::to_owned),
426        client_addr: str_field(record, "client_addr").map(ToOwned::to_owned),
427        cwd: str_field(record, "cwd").map(ToOwned::to_owned),
428        model,
429        reasoning_effort: str_field(record, "reasoning_effort").map(ToOwned::to_owned),
430        service_tier: service_tier_value(record),
431        station_name: non_dash(station_name(record)).map(ToOwned::to_owned),
432        provider_id: str_field(record, "provider_id").map(ToOwned::to_owned),
433        upstream_base_url: str_field(record, "upstream_base_url").map(ToOwned::to_owned),
434        route_decision,
435        usage,
436        cost,
437        retry,
438        observability: RequestObservability::default(),
439        service: service.to_string(),
440        method: str_field(record, "method").unwrap_or("-").to_string(),
441        path: str_field(record, "path").unwrap_or("-").to_string(),
442        status_code,
443        duration_ms,
444        ttfb_ms: u64_field(record, "ttfb_ms"),
445        streaming: record
446            .get("streaming")
447            .and_then(|value| value.as_bool())
448            .unwrap_or(false),
449        ended_at_ms: timestamp_ms,
450    };
451    request.refresh_observability();
452    Some(request)
453}
454
455fn field_contains(value: Option<&str>, expected: &str) -> bool {
456    let expected = expected.trim().to_ascii_lowercase();
457    if expected.is_empty() {
458        return true;
459    }
460    value
461        .map(|value| value.to_ascii_lowercase().contains(&expected))
462        .unwrap_or(false)
463}
464
465fn request_is_fast(record: &JsonValue) -> bool {
466    record
467        .get("observability")
468        .and_then(|observability| observability.get("fast_mode"))
469        .and_then(|value| value.as_bool())
470        .unwrap_or(false)
471        || service_tier_display(record).eq_ignore_ascii_case("priority(fast)")
472}
473
474fn request_was_retried(record: &JsonValue) -> bool {
475    if record
476        .get("observability")
477        .and_then(|observability| observability.get("retried"))
478        .and_then(|value| value.as_bool())
479        .unwrap_or(false)
480    {
481        return true;
482    }
483    if record
484        .get("observability")
485        .and_then(|observability| observability.get("attempt_count"))
486        .and_then(|value| value.as_u64())
487        .is_some_and(|attempts| attempts > 1)
488    {
489        return true;
490    }
491    if record
492        .get("retry")
493        .and_then(|retry| retry.get("route_attempts"))
494        .and_then(|attempts| attempts.as_array())
495        .is_some_and(|attempts| attempts.len() > 1)
496    {
497        return true;
498    }
499    if record
500        .get("retry")
501        .and_then(|retry| retry.get("attempts"))
502        .and_then(|attempts| attempts.as_u64())
503        .is_some_and(|attempts| attempts > 1)
504    {
505        return true;
506    }
507    record
508        .get("retry")
509        .and_then(|retry| retry.get("upstream_chain"))
510        .and_then(|attempts| attempts.as_array())
511        .is_some_and(|attempts| attempts.len() > 1)
512}
513
514fn usage_metrics(record: &JsonValue) -> Option<UsageMetrics> {
515    record
516        .get("usage")
517        .and_then(|usage| serde_json::from_value::<UsageMetrics>(usage.clone()).ok())
518}
519
520fn request_cost_display(service: &str, model: &str, usage: Option<&UsageMetrics>) -> String {
521    let model = model.trim();
522    if model.is_empty() || model == "-" {
523        return "-".to_string();
524    }
525    let cost = estimate_request_cost_from_operator_catalog_for_service(
526        Some(model),
527        usage,
528        CostAdjustments::default(),
529        service,
530    );
531    cost.display_total_with_confidence()
532}
533
534fn output_tokens_per_second(
535    usage: &UsageMetrics,
536    duration_ms: u64,
537    ttfb_ms: Option<u64>,
538) -> Option<f64> {
539    let output_tokens = usage.output_tokens.max(0);
540    if output_tokens == 0 || duration_ms == 0 {
541        return None;
542    }
543    let generation_ms = match ttfb_ms {
544        Some(ttfb) if ttfb > 0 && ttfb < duration_ms => duration_ms.saturating_sub(ttfb),
545        _ => duration_ms,
546    };
547    if generation_ms == 0 {
548        return None;
549    }
550    Some(output_tokens as f64 / (generation_ms as f64 / 1000.0))
551}
552
553fn request_model(record: &JsonValue) -> Option<String> {
554    str_field(record, "model")
555        .map(ToOwned::to_owned)
556        .or_else(|| nested_str(record, &["route_decision", "effective_model", "value"]))
557        .or_else(|| model_from_route_attempts(record))
558        .or_else(|| model_from_legacy_retry_chain(record))
559}
560
561fn model_from_route_attempts(record: &JsonValue) -> Option<String> {
562    record
563        .get("retry")
564        .and_then(|retry| retry.get("route_attempts"))
565        .and_then(|attempts| attempts.as_array())
566        .and_then(|attempts| {
567            attempts
568                .iter()
569                .rev()
570                .filter(|attempt| {
571                    !attempt
572                        .get("skipped")
573                        .and_then(|value| value.as_bool())
574                        .unwrap_or(false)
575                })
576                .find_map(|attempt| str_field(attempt, "model").map(ToOwned::to_owned))
577        })
578}
579
580fn model_from_legacy_retry_chain(record: &JsonValue) -> Option<String> {
581    record
582        .get("retry")
583        .and_then(|retry| retry.get("upstream_chain"))
584        .and_then(|chain| chain.as_array())
585        .and_then(|chain| {
586            chain
587                .iter()
588                .rev()
589                .filter_map(|entry| entry.as_str())
590                .find_map(|entry| raw_kv_field(entry, "model"))
591        })
592}
593
594fn raw_kv_field(raw: &str, key: &str) -> Option<String> {
595    let prefix = format!("{key}=");
596    raw.split_whitespace()
597        .find_map(|part| part.strip_prefix(&prefix))
598        .map(|value| value.trim().trim_matches(',').to_string())
599        .filter(|value| !value.is_empty() && value != "-")
600}
601
602fn service_tier_display(record: &JsonValue) -> String {
603    let tier = service_tier_value(record).unwrap_or_else(|| "-".to_string());
604    if tier.eq_ignore_ascii_case("priority") {
605        "priority(fast)".to_string()
606    } else {
607        tier
608    }
609}
610
611fn service_tier_value(record: &JsonValue) -> Option<String> {
612    record.get("service_tier").and_then(|tier| {
613        tier.as_str()
614            .map(ToOwned::to_owned)
615            .or_else(|| service_tier_log_value(tier))
616    })
617}
618
619fn service_tier_log_value(tier: &JsonValue) -> Option<String> {
620    ["actual", "effective", "requested"]
621        .iter()
622        .find_map(|key| str_field(tier, key).map(ToOwned::to_owned))
623}
624
625fn station_name(record: &JsonValue) -> &str {
626    str_field(record, "station_name")
627        .or_else(|| str_field(record, "config_name"))
628        .unwrap_or("-")
629}
630
631fn provider_endpoint_display(record: &JsonValue) -> String {
632    str_field(record, "provider_endpoint_key")
633        .map(ToOwned::to_owned)
634        .or_else(|| {
635            let provider = str_field(record, "provider_id")?;
636            let endpoint = str_field(record, "endpoint_id")?;
637            Some(format!("{provider}.{endpoint}"))
638        })
639        .unwrap_or_else(|| "-".to_string())
640}
641
642fn non_dash(value: &str) -> Option<&str> {
643    (value != "-").then_some(value)
644}
645
646fn str_field<'a>(record: &'a JsonValue, key: &str) -> Option<&'a str> {
647    record
648        .get(key)
649        .and_then(|value| value.as_str())
650        .map(str::trim)
651        .filter(|value| !value.is_empty())
652}
653
654fn nested_str(record: &JsonValue, path: &[&str]) -> Option<String> {
655    let mut current = record;
656    for key in path {
657        current = current.get(*key)?;
658    }
659    current
660        .as_str()
661        .map(str::trim)
662        .filter(|value| !value.is_empty())
663        .map(ToOwned::to_owned)
664}
665
666fn i64_field(record: &JsonValue, key: &str) -> Option<i64> {
667    record.get(key).and_then(|value| value.as_i64())
668}
669
670fn u64_field(record: &JsonValue, key: &str) -> Option<u64> {
671    record.get(key).and_then(|value| value.as_u64())
672}
673
674fn reasoning_tokens(usage: &UsageMetrics) -> i64 {
675    usage.reasoning_output_tokens_total().max(0)
676}
677
678fn cache_creation_tokens(usage: &UsageMetrics) -> i64 {
679    usage.cache_creation_tokens_total().max(0)
680}
681
682fn total_tokens(usage: &UsageMetrics, accounting: CacheInputAccounting) -> i64 {
683    if usage.total_tokens > 0 {
684        usage.total_tokens
685    } else {
686        let breakdown = usage.cache_usage_breakdown(accounting);
687        breakdown
688            .effective_input_tokens
689            .saturating_add(usage.output_tokens.max(0))
690            .saturating_add(breakdown.cache_read_input_tokens)
691            .saturating_add(breakdown.cache_creation_input_tokens)
692    }
693}
694
695fn format_ms(value: u64) -> String {
696    format!("{value}ms")
697}
698
699fn format_optional_ms(value: Option<u64>) -> String {
700    value.map(format_ms).unwrap_or_else(|| "-".to_string())
701}
702
703fn format_optional_speed(value: Option<f64>) -> String {
704    value
705        .map(|speed| format!("{speed:.2} tok/s"))
706        .unwrap_or_else(|| "-".to_string())
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712    use serde_json::json;
713
714    #[test]
715    fn display_lines_include_route_model_fast_cache_and_speed() {
716        let record = json!({
717            "timestamp_ms": 123,
718            "service": "codex",
719            "method": "POST",
720            "path": "/v1/responses",
721            "status_code": 200,
722            "duration_ms": 2000,
723            "ttfb_ms": 500,
724            "provider_id": "relay",
725            "endpoint_id": "default",
726            "provider_endpoint_key": "codex/relay/default",
727            "service_tier": { "effective": "priority" },
728            "usage": {
729                "input_tokens": 1000,
730                "output_tokens": 30,
731                "cache_read_input_tokens": 10,
732                "cache_creation_5m_input_tokens": 5,
733                "reasoning_output_tokens": 7,
734                "total_tokens": 1045
735            },
736            "retry": {
737                "route_attempts": [
738                    { "decision": "completed", "model": "gpt-5" }
739                ]
740            }
741        });
742
743        let lines = format_request_log_record_lines(&record);
744
745        assert!(lines[0].contains("endpoint=codex/relay/default"));
746        assert!(lines[0].contains("station=-"));
747        assert!(lines[0].contains("model=gpt-5"));
748        assert!(lines[0].contains("tier=priority(fast)"));
749        assert!(lines[1].contains("output_speed=20.00 tok/s"));
750        assert!(lines[1].contains("cost="));
751        assert!(lines[2].contains("cache_read=10"));
752        assert!(lines[2].contains("cache_create=5"));
753        assert!(lines[2].contains("reasoning=7"));
754    }
755
756    #[test]
757    fn request_model_reads_legacy_retry_chain_model() {
758        let record = json!({
759            "retry": {
760                "upstream_chain": [
761                    "main:https://relay.example/v1 (idx=0) status=200 class=- model=gpt-5.4-mini"
762                ]
763            }
764        });
765
766        assert_eq!(request_model(&record).as_deref(), Some("gpt-5.4-mini"));
767    }
768
769    #[test]
770    fn usage_aggregate_summary_includes_cache_and_average_duration() {
771        let mut aggregate = RequestUsageAggregate::default();
772        aggregate.record(
773            200,
774            Some(&UsageMetrics {
775                input_tokens: 10,
776                output_tokens: 5,
777                cache_read_input_tokens: 3,
778                cache_creation_1h_input_tokens: 2,
779                reasoning_output_tokens: 4,
780                ..UsageMetrics::default()
781            }),
782            CacheInputAccounting::default(),
783        );
784        aggregate.record(400, None, CacheInputAccounting::default());
785
786        assert_eq!(
787            aggregate.summary_line("main"),
788            "main | 2 | 10 | 5 | 3 | 2 | 4 | 20 | 300"
789        );
790    }
791
792    #[test]
793    fn filters_match_route_model_fast_retry_and_status() {
794        let record = json!({
795            "session_id": "sid-abc",
796            "station_name": "main-station",
797            "provider_id": "relay-one",
798            "status_code": 429,
799            "service_tier": { "actual": "priority" },
800            "observability": {
801                "attempt_count": 2,
802                "retried": true
803            },
804            "retry": {
805                "route_attempts": [
806                    { "decision": "failed_status", "model": "gpt-5.4-high" },
807                    { "decision": "completed", "model": "gpt-5.4-high" }
808                ]
809            }
810        });
811
812        let filters = RequestLogFilters {
813            session: Some("abc".to_string()),
814            model: Some("5.4".to_string()),
815            station: Some("main".to_string()),
816            provider: Some("relay".to_string()),
817            status_min: Some(400),
818            status_max: Some(499),
819            fast: true,
820            retried: true,
821        };
822
823        assert!(filters.matches(&record));
824    }
825
826    #[test]
827    fn filters_reject_nonmatching_model() {
828        let record = json!({
829            "model": "gpt-5.4-high",
830            "status_code": 200
831        });
832        let filters = RequestLogFilters {
833            model: Some("mini".to_string()),
834            ..RequestLogFilters::default()
835        };
836
837        assert!(!filters.matches(&record));
838    }
839
840    #[test]
841    fn tail_keeps_invalid_raw_lines_but_summary_ignores_them() {
842        let lines = [
843            RequestLogLine::from_raw(
844                r#"{"station_name":"a","duration_ms":100,"usage":{"total_tokens":7}}"#,
845            ),
846            RequestLogLine::from_raw("not-json"),
847            RequestLogLine::from_raw(
848                r#"{"station_name":"b","duration_ms":200,"usage":{"total_tokens":3}}"#,
849            ),
850        ];
851
852        assert!(!lines[1].is_valid_json());
853        let rows = summarize_request_log_lines(
854            lines.iter(),
855            RequestUsageSummaryGroup::Station,
856            &RequestLogFilters::default(),
857            10,
858        );
859
860        assert_eq!(rows.len(), 2);
861        assert_eq!(rows[0].group_value, "a");
862        assert_eq!(rows[0].aggregate.total_tokens, 7);
863        assert_eq!(rows[1].group_value, "b");
864        assert_eq!(rows[1].aggregate.total_tokens, 3);
865    }
866
867    #[test]
868    fn summary_can_group_by_provider_model_or_session_with_filters() {
869        let lines = [
870            RequestLogLine::from_raw(
871                r#"{"session_id":"sid-a","station_name":"s1","provider_id":"p1","model":"gpt-5","status_code":200,"usage":{"total_tokens":7}}"#,
872            ),
873            RequestLogLine::from_raw(
874                r#"{"session_id":"sid-b","station_name":"s1","provider_id":"p1","model":"gpt-5.4","status_code":429,"usage":{"total_tokens":11}}"#,
875            ),
876            RequestLogLine::from_raw(
877                r#"{"session_id":"sid-b","station_name":"s2","provider_id":"p2","model":"gpt-5.4","status_code":200,"usage":{"total_tokens":3}}"#,
878            ),
879        ];
880
881        let provider_rows = summarize_request_log_lines(
882            lines.iter(),
883            RequestUsageSummaryGroup::Provider,
884            &RequestLogFilters::default(),
885            10,
886        );
887        assert_eq!(provider_rows[0].group_value, "p1");
888        assert_eq!(provider_rows[0].aggregate.total_tokens, 18);
889        assert_eq!(provider_rows[1].group_value, "p2");
890
891        let model_rows = summarize_request_log_lines(
892            lines.iter(),
893            RequestUsageSummaryGroup::Model,
894            &RequestLogFilters {
895                status_min: Some(400),
896                ..RequestLogFilters::default()
897            },
898            10,
899        );
900        assert_eq!(model_rows.len(), 1);
901        assert_eq!(model_rows[0].group_value, "gpt-5.4");
902        assert_eq!(model_rows[0].aggregate.total_tokens, 11);
903
904        let session_rows = summarize_request_log_lines(
905            lines.iter(),
906            RequestUsageSummaryGroup::Session,
907            &RequestLogFilters::default(),
908            10,
909        );
910        assert_eq!(session_rows[0].group_value, "sid-b");
911        assert_eq!(session_rows[0].aggregate.requests, 2);
912    }
913
914    #[test]
915    fn request_log_record_projects_to_finished_request_for_ui_reuse() {
916        let record = json!({
917            "timestamp_ms": 1234,
918            "request_id": 42,
919            "trace_id": "codex-42",
920            "service": "codex",
921            "method": "POST",
922            "path": "/v1/responses",
923            "status_code": 200,
924            "duration_ms": 1500,
925            "ttfb_ms": 500,
926            "station_name": "primary",
927            "provider_id": "relay",
928            "upstream_base_url": "https://relay.example/v1",
929            "session_id": "sid-a",
930            "reasoning_effort": "medium",
931            "service_tier": { "actual": "priority" },
932            "usage": {
933                "input_tokens": 100,
934                "output_tokens": 50,
935                "total_tokens": 150
936            },
937            "retry": {
938                "attempts": 2,
939                "upstream_chain": [
940                    "primary:https://relay.example/v1 (idx=0) status=429 class=rate_limit model=gpt-5.4",
941                    "primary:https://relay.example/v1 (idx=1) status=200 class=- model=gpt-5.4"
942                ]
943            }
944        });
945
946        let request =
947            finished_request_from_request_log_record(&record).expect("finished request projection");
948
949        assert_eq!(request.id, 42);
950        assert_eq!(request.trace_id.as_deref(), Some("codex-42"));
951        assert_eq!(request.session_id.as_deref(), Some("sid-a"));
952        assert_eq!(request.model.as_deref(), Some("gpt-5.4"));
953        assert_eq!(request.service_tier.as_deref(), Some("priority"));
954        assert!(request.is_fast_mode());
955        assert_eq!(request.attempt_count(), 2);
956        assert_eq!(request.output_tokens_per_second(), Some(50.0));
957        assert_eq!(request.ended_at_ms, 1234);
958    }
959}