1use std::collections::HashMap;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4use std::path::Path;
5
6use serde_json::Value as JsonValue;
7
8pub use crate::logging::request_log_path;
9use crate::pricing::{CostAdjustments, estimate_request_cost_from_operator_catalog_for_service};
10use crate::state::{FinishedRequest, RequestObservability, RouteDecisionProvenance};
11use crate::usage::{CacheInputAccounting, UsageMetrics};
12
13#[derive(Debug, Clone, PartialEq)]
14pub struct RequestLogLine {
15 raw: String,
16 value: Option<JsonValue>,
17}
18
19impl RequestLogLine {
20 pub fn from_raw(raw: impl Into<String>) -> Self {
21 let raw = raw.into();
22 let value = serde_json::from_str::<JsonValue>(&raw).ok();
23 Self { raw, value }
24 }
25
26 pub fn raw(&self) -> &str {
27 &self.raw
28 }
29
30 pub fn value(&self) -> Option<&JsonValue> {
31 self.value.as_ref()
32 }
33
34 pub fn is_valid_json(&self) -> bool {
35 self.value.is_some()
36 }
37
38 pub fn display_lines(&self) -> Vec<String> {
39 self.value
40 .as_ref()
41 .map(format_request_log_record_lines)
42 .unwrap_or_default()
43 }
44}
45
46#[derive(Debug, Default, Clone, PartialEq, Eq)]
47pub struct RequestLogFilters {
48 pub session: Option<String>,
49 pub model: Option<String>,
50 pub station: Option<String>,
51 pub provider: Option<String>,
52 pub status_min: Option<u64>,
53 pub status_max: Option<u64>,
54 pub fast: bool,
55 pub retried: bool,
56}
57
58impl RequestLogFilters {
59 pub fn is_empty(&self) -> bool {
60 self.session.is_none()
61 && self.model.is_none()
62 && self.station.is_none()
63 && self.provider.is_none()
64 && self.status_min.is_none()
65 && self.status_max.is_none()
66 && !self.fast
67 && !self.retried
68 }
69
70 pub fn matches(&self, record: &JsonValue) -> bool {
71 if let Some(expected) = self.session.as_deref()
72 && !field_contains(str_field(record, "session_id"), expected)
73 {
74 return false;
75 }
76 if let Some(expected) = self.model.as_deref()
77 && !field_contains(request_model(record).as_deref(), expected)
78 {
79 return false;
80 }
81 if let Some(expected) = self.station.as_deref()
82 && !field_contains(Some(station_name(record)), expected)
83 {
84 return false;
85 }
86 if let Some(expected) = self.provider.as_deref()
87 && !field_contains(str_field(record, "provider_id"), expected)
88 {
89 return false;
90 }
91 if let Some(min) = self.status_min
92 && u64_field(record, "status_code").unwrap_or(0) < min
93 {
94 return false;
95 }
96 if let Some(max) = self.status_max
97 && u64_field(record, "status_code").unwrap_or(0) > max
98 {
99 return false;
100 }
101 if self.fast && !request_is_fast(record) {
102 return false;
103 }
104 if self.retried && !request_was_retried(record) {
105 return false;
106 }
107 true
108 }
109}
110
111#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
112pub struct RequestUsageAggregate {
113 pub requests: u64,
114 pub duration_ms_total: u64,
115 pub input_tokens: i64,
116 pub output_tokens: i64,
117 pub reasoning_tokens: i64,
118 pub cache_read_input_tokens: i64,
119 pub cache_creation_input_tokens: i64,
120 pub total_tokens: i64,
121}
122
123impl RequestUsageAggregate {
124 pub fn record(
125 &mut self,
126 duration_ms: u64,
127 usage: Option<&UsageMetrics>,
128 accounting: CacheInputAccounting,
129 ) {
130 self.requests = self.requests.saturating_add(1);
131 self.duration_ms_total = self.duration_ms_total.saturating_add(duration_ms);
132 let Some(usage) = usage else {
133 return;
134 };
135
136 self.input_tokens = self.input_tokens.saturating_add(usage.input_tokens.max(0));
137 self.output_tokens = self
138 .output_tokens
139 .saturating_add(usage.output_tokens.max(0));
140 self.reasoning_tokens = self
141 .reasoning_tokens
142 .saturating_add(reasoning_tokens(usage));
143 self.cache_read_input_tokens = self
144 .cache_read_input_tokens
145 .saturating_add(usage.cache_read_tokens_total());
146 self.cache_creation_input_tokens = self
147 .cache_creation_input_tokens
148 .saturating_add(cache_creation_tokens(usage));
149 self.total_tokens = self
150 .total_tokens
151 .saturating_add(total_tokens(usage, accounting));
152 }
153
154 pub fn average_duration_ms(&self) -> u64 {
155 self.duration_ms_total
156 .checked_div(self.requests)
157 .unwrap_or(0)
158 }
159
160 pub fn summary_line(&self, station_name: &str) -> String {
161 format!(
162 "{} | {} | {} | {} | {} | {} | {} | {} | {}",
163 station_name,
164 self.requests,
165 self.input_tokens,
166 self.output_tokens,
167 self.cache_read_input_tokens,
168 self.cache_creation_input_tokens,
169 self.reasoning_tokens,
170 self.total_tokens,
171 self.average_duration_ms()
172 )
173 }
174}
175
176#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default)]
177#[serde(rename_all = "snake_case")]
178pub enum RequestUsageSummaryGroup {
179 #[default]
180 Station,
181 Provider,
182 Model,
183 Session,
184}
185
186impl RequestUsageSummaryGroup {
187 pub fn column_name(self) -> &'static str {
188 match self {
189 Self::Station => "station_name",
190 Self::Provider => "provider_id",
191 Self::Model => "model",
192 Self::Session => "session_id",
193 }
194 }
195
196 fn key(self, record: &JsonValue) -> String {
197 match self {
198 Self::Station => station_name(record).to_string(),
199 Self::Provider => str_field(record, "provider_id").unwrap_or("-").to_string(),
200 Self::Model => request_model(record).unwrap_or_else(|| "-".to_string()),
201 Self::Session => str_field(record, "session_id").unwrap_or("-").to_string(),
202 }
203 }
204}
205
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
207pub struct RequestUsageSummaryRow {
208 pub group_value: String,
209 pub aggregate: RequestUsageAggregate,
210}
211
212pub fn read_request_log_lines(path: &Path) -> std::io::Result<Vec<RequestLogLine>> {
213 let file = File::open(path)?;
214 let reader = BufReader::new(file);
215 Ok(reader
216 .lines()
217 .map_while(Result::ok)
218 .map(RequestLogLine::from_raw)
219 .collect())
220}
221
222pub fn tail_request_log(path: &Path, limit: usize) -> std::io::Result<Vec<RequestLogLine>> {
223 let lines = read_request_log_lines(path)?;
224 let total = lines.len();
225 let start = total.saturating_sub(limit);
226 Ok(lines[start..].to_vec())
227}
228
229pub fn tail_finished_requests_from_log(
230 path: &Path,
231 limit: usize,
232) -> std::io::Result<Vec<FinishedRequest>> {
233 let lines = tail_request_log(path, limit)?;
234 let mut requests = lines
235 .iter()
236 .filter_map(|line| {
237 line.value()
238 .and_then(finished_request_from_request_log_record)
239 })
240 .collect::<Vec<_>>();
241 requests.reverse();
242 Ok(requests)
243}
244
245pub fn find_finished_requests_from_log(
246 path: &Path,
247 filters: &RequestLogFilters,
248 limit: usize,
249) -> std::io::Result<Vec<FinishedRequest>> {
250 let lines = find_request_log(path, filters, limit)?;
251 Ok(lines
252 .iter()
253 .filter_map(|line| {
254 line.value()
255 .and_then(finished_request_from_request_log_record)
256 })
257 .collect())
258}
259
260pub fn summarize_request_log(
261 path: &Path,
262 group: RequestUsageSummaryGroup,
263 filters: &RequestLogFilters,
264 limit: usize,
265) -> std::io::Result<Vec<RequestUsageSummaryRow>> {
266 let lines = read_request_log_lines(path)?;
267 Ok(summarize_request_log_lines(
268 lines.iter(),
269 group,
270 filters,
271 limit,
272 ))
273}
274
275pub fn find_request_log(
276 path: &Path,
277 filters: &RequestLogFilters,
278 limit: usize,
279) -> std::io::Result<Vec<RequestLogLine>> {
280 let lines = read_request_log_lines(path)?;
281 Ok(lines
282 .iter()
283 .rev()
284 .filter(|line| line.value().is_some_and(|record| filters.matches(record)))
285 .take(limit)
286 .cloned()
287 .collect())
288}
289
290fn summarize_request_log_lines<'a>(
291 lines: impl IntoIterator<Item = &'a RequestLogLine>,
292 group: RequestUsageSummaryGroup,
293 filters: &RequestLogFilters,
294 limit: usize,
295) -> Vec<RequestUsageSummaryRow> {
296 let mut aggregate: HashMap<String, RequestUsageAggregate> = HashMap::new();
297 for line in lines {
298 let Some(record) = line.value() else {
299 continue;
300 };
301 if !filters.matches(record) {
302 continue;
303 }
304 let group_value = group.key(record);
305 let duration_ms = u64_field(record, "duration_ms").unwrap_or(0);
306 let service = str_field(record, "service").unwrap_or("-");
307 let usage = usage_metrics(record);
308 let entry = aggregate.entry(group_value).or_default();
309 entry.record(
310 duration_ms,
311 usage.as_ref(),
312 CacheInputAccounting::for_service(service),
313 );
314 }
315
316 let mut items: Vec<RequestUsageSummaryRow> = aggregate
317 .into_iter()
318 .map(|(group_value, aggregate)| RequestUsageSummaryRow {
319 group_value,
320 aggregate,
321 })
322 .collect();
323 items.sort_by(|a, b| {
324 b.aggregate
325 .total_tokens
326 .cmp(&a.aggregate.total_tokens)
327 .then_with(|| a.group_value.cmp(&b.group_value))
328 });
329 items.into_iter().take(limit).collect()
330}
331
332pub fn format_request_log_record_lines(record: &JsonValue) -> Vec<String> {
333 let ts = i64_field(record, "timestamp_ms").unwrap_or(0);
334 let service = str_field(record, "service").unwrap_or("-");
335 let method = str_field(record, "method").unwrap_or("-");
336 let path = str_field(record, "path").unwrap_or("-");
337 let status = u64_field(record, "status_code").unwrap_or(0);
338 let provider = str_field(record, "provider_id").unwrap_or("-");
339 let endpoint = provider_endpoint_display(record);
340 let station = station_name(record);
341 let model = request_model(record).unwrap_or_else(|| "-".to_string());
342 let tier = service_tier_display(record);
343
344 let mut lines = vec![format!(
345 "[{}] {} {} {} status={} endpoint={} provider={} station={} model={} tier={}",
346 ts, service, method, path, status, endpoint, provider, station, model, tier
347 )];
348
349 let duration_ms = u64_field(record, "duration_ms").unwrap_or(0);
350 let ttfb_ms = u64_field(record, "ttfb_ms");
351 let usage = usage_metrics(record);
352 let speed = usage
353 .as_ref()
354 .and_then(|usage| output_tokens_per_second(usage, duration_ms, ttfb_ms));
355 let cost = request_cost_display(service, model.as_str(), usage.as_ref());
356
357 lines.push(format!(
358 " timing duration={} ttfb={} output_speed={} cost={}",
359 format_ms(duration_ms),
360 format_optional_ms(ttfb_ms),
361 format_optional_speed(speed),
362 cost
363 ));
364
365 if let Some(usage) = usage.as_ref() {
366 lines.push(format!(
367 " tokens input={} output={} cache_read={} cache_create={} reasoning={} total={}",
368 usage.input_tokens.max(0),
369 usage.output_tokens.max(0),
370 usage.cache_read_tokens_total(),
371 cache_creation_tokens(usage),
372 reasoning_tokens(usage),
373 total_tokens(usage, CacheInputAccounting::for_service(service))
374 ));
375 } else {
376 lines.push(" tokens -".to_string());
377 }
378
379 lines
380}
381
382pub fn request_log_record_model(record: &JsonValue) -> Option<String> {
383 request_model(record)
384}
385
386pub fn request_log_record_station(record: &JsonValue) -> &str {
387 station_name(record)
388}
389
390pub fn request_log_record_is_fast(record: &JsonValue) -> bool {
391 request_is_fast(record)
392}
393
394pub fn request_log_record_was_retried(record: &JsonValue) -> bool {
395 request_was_retried(record)
396}
397
398pub fn finished_request_from_request_log_record(record: &JsonValue) -> Option<FinishedRequest> {
399 let timestamp_ms = u64_field(record, "timestamp_ms").unwrap_or(0);
400 let request_id = u64_field(record, "request_id").unwrap_or(timestamp_ms);
401 let status_code = u64_field(record, "status_code")
402 .and_then(|status| u16::try_from(status).ok())
403 .unwrap_or(0);
404 let duration_ms = u64_field(record, "duration_ms").unwrap_or(0);
405 let usage = usage_metrics(record);
406 let model = request_model(record);
407 let service = str_field(record, "service").unwrap_or("-");
408 let cost = estimate_request_cost_from_operator_catalog_for_service(
409 model.as_deref(),
410 usage.as_ref(),
411 CostAdjustments::default(),
412 service,
413 );
414 let retry = record
415 .get("retry")
416 .and_then(|retry| serde_json::from_value(retry.clone()).ok());
417 let route_decision = record.get("route_decision").and_then(|route_decision| {
418 serde_json::from_value::<RouteDecisionProvenance>(route_decision.clone()).ok()
419 });
420
421 let mut request = FinishedRequest {
422 id: request_id,
423 trace_id: str_field(record, "trace_id").map(ToOwned::to_owned),
424 session_id: str_field(record, "session_id").map(ToOwned::to_owned),
425 client_name: str_field(record, "client_name").map(ToOwned::to_owned),
426 client_addr: str_field(record, "client_addr").map(ToOwned::to_owned),
427 cwd: str_field(record, "cwd").map(ToOwned::to_owned),
428 model,
429 reasoning_effort: str_field(record, "reasoning_effort").map(ToOwned::to_owned),
430 service_tier: service_tier_value(record),
431 station_name: non_dash(station_name(record)).map(ToOwned::to_owned),
432 provider_id: str_field(record, "provider_id").map(ToOwned::to_owned),
433 upstream_base_url: str_field(record, "upstream_base_url").map(ToOwned::to_owned),
434 route_decision,
435 usage,
436 cost,
437 retry,
438 observability: RequestObservability::default(),
439 service: service.to_string(),
440 method: str_field(record, "method").unwrap_or("-").to_string(),
441 path: str_field(record, "path").unwrap_or("-").to_string(),
442 status_code,
443 duration_ms,
444 ttfb_ms: u64_field(record, "ttfb_ms"),
445 streaming: record
446 .get("streaming")
447 .and_then(|value| value.as_bool())
448 .unwrap_or(false),
449 ended_at_ms: timestamp_ms,
450 };
451 request.refresh_observability();
452 Some(request)
453}
454
455fn field_contains(value: Option<&str>, expected: &str) -> bool {
456 let expected = expected.trim().to_ascii_lowercase();
457 if expected.is_empty() {
458 return true;
459 }
460 value
461 .map(|value| value.to_ascii_lowercase().contains(&expected))
462 .unwrap_or(false)
463}
464
465fn request_is_fast(record: &JsonValue) -> bool {
466 record
467 .get("observability")
468 .and_then(|observability| observability.get("fast_mode"))
469 .and_then(|value| value.as_bool())
470 .unwrap_or(false)
471 || service_tier_display(record).eq_ignore_ascii_case("priority(fast)")
472}
473
474fn request_was_retried(record: &JsonValue) -> bool {
475 if record
476 .get("observability")
477 .and_then(|observability| observability.get("retried"))
478 .and_then(|value| value.as_bool())
479 .unwrap_or(false)
480 {
481 return true;
482 }
483 if record
484 .get("observability")
485 .and_then(|observability| observability.get("attempt_count"))
486 .and_then(|value| value.as_u64())
487 .is_some_and(|attempts| attempts > 1)
488 {
489 return true;
490 }
491 if record
492 .get("retry")
493 .and_then(|retry| retry.get("route_attempts"))
494 .and_then(|attempts| attempts.as_array())
495 .is_some_and(|attempts| attempts.len() > 1)
496 {
497 return true;
498 }
499 if record
500 .get("retry")
501 .and_then(|retry| retry.get("attempts"))
502 .and_then(|attempts| attempts.as_u64())
503 .is_some_and(|attempts| attempts > 1)
504 {
505 return true;
506 }
507 record
508 .get("retry")
509 .and_then(|retry| retry.get("upstream_chain"))
510 .and_then(|attempts| attempts.as_array())
511 .is_some_and(|attempts| attempts.len() > 1)
512}
513
514fn usage_metrics(record: &JsonValue) -> Option<UsageMetrics> {
515 record
516 .get("usage")
517 .and_then(|usage| serde_json::from_value::<UsageMetrics>(usage.clone()).ok())
518}
519
520fn request_cost_display(service: &str, model: &str, usage: Option<&UsageMetrics>) -> String {
521 let model = model.trim();
522 if model.is_empty() || model == "-" {
523 return "-".to_string();
524 }
525 let cost = estimate_request_cost_from_operator_catalog_for_service(
526 Some(model),
527 usage,
528 CostAdjustments::default(),
529 service,
530 );
531 cost.display_total_with_confidence()
532}
533
534fn output_tokens_per_second(
535 usage: &UsageMetrics,
536 duration_ms: u64,
537 ttfb_ms: Option<u64>,
538) -> Option<f64> {
539 let output_tokens = usage.output_tokens.max(0);
540 if output_tokens == 0 || duration_ms == 0 {
541 return None;
542 }
543 let generation_ms = match ttfb_ms {
544 Some(ttfb) if ttfb > 0 && ttfb < duration_ms => duration_ms.saturating_sub(ttfb),
545 _ => duration_ms,
546 };
547 if generation_ms == 0 {
548 return None;
549 }
550 Some(output_tokens as f64 / (generation_ms as f64 / 1000.0))
551}
552
553fn request_model(record: &JsonValue) -> Option<String> {
554 str_field(record, "model")
555 .map(ToOwned::to_owned)
556 .or_else(|| nested_str(record, &["route_decision", "effective_model", "value"]))
557 .or_else(|| model_from_route_attempts(record))
558 .or_else(|| model_from_legacy_retry_chain(record))
559}
560
561fn model_from_route_attempts(record: &JsonValue) -> Option<String> {
562 record
563 .get("retry")
564 .and_then(|retry| retry.get("route_attempts"))
565 .and_then(|attempts| attempts.as_array())
566 .and_then(|attempts| {
567 attempts
568 .iter()
569 .rev()
570 .filter(|attempt| {
571 !attempt
572 .get("skipped")
573 .and_then(|value| value.as_bool())
574 .unwrap_or(false)
575 })
576 .find_map(|attempt| str_field(attempt, "model").map(ToOwned::to_owned))
577 })
578}
579
580fn model_from_legacy_retry_chain(record: &JsonValue) -> Option<String> {
581 record
582 .get("retry")
583 .and_then(|retry| retry.get("upstream_chain"))
584 .and_then(|chain| chain.as_array())
585 .and_then(|chain| {
586 chain
587 .iter()
588 .rev()
589 .filter_map(|entry| entry.as_str())
590 .find_map(|entry| raw_kv_field(entry, "model"))
591 })
592}
593
594fn raw_kv_field(raw: &str, key: &str) -> Option<String> {
595 let prefix = format!("{key}=");
596 raw.split_whitespace()
597 .find_map(|part| part.strip_prefix(&prefix))
598 .map(|value| value.trim().trim_matches(',').to_string())
599 .filter(|value| !value.is_empty() && value != "-")
600}
601
602fn service_tier_display(record: &JsonValue) -> String {
603 let tier = service_tier_value(record).unwrap_or_else(|| "-".to_string());
604 if tier.eq_ignore_ascii_case("priority") {
605 "priority(fast)".to_string()
606 } else {
607 tier
608 }
609}
610
611fn service_tier_value(record: &JsonValue) -> Option<String> {
612 record.get("service_tier").and_then(|tier| {
613 tier.as_str()
614 .map(ToOwned::to_owned)
615 .or_else(|| service_tier_log_value(tier))
616 })
617}
618
619fn service_tier_log_value(tier: &JsonValue) -> Option<String> {
620 ["actual", "effective", "requested"]
621 .iter()
622 .find_map(|key| str_field(tier, key).map(ToOwned::to_owned))
623}
624
625fn station_name(record: &JsonValue) -> &str {
626 str_field(record, "station_name")
627 .or_else(|| str_field(record, "config_name"))
628 .unwrap_or("-")
629}
630
631fn provider_endpoint_display(record: &JsonValue) -> String {
632 str_field(record, "provider_endpoint_key")
633 .map(ToOwned::to_owned)
634 .or_else(|| {
635 let provider = str_field(record, "provider_id")?;
636 let endpoint = str_field(record, "endpoint_id")?;
637 Some(format!("{provider}.{endpoint}"))
638 })
639 .unwrap_or_else(|| "-".to_string())
640}
641
642fn non_dash(value: &str) -> Option<&str> {
643 (value != "-").then_some(value)
644}
645
646fn str_field<'a>(record: &'a JsonValue, key: &str) -> Option<&'a str> {
647 record
648 .get(key)
649 .and_then(|value| value.as_str())
650 .map(str::trim)
651 .filter(|value| !value.is_empty())
652}
653
654fn nested_str(record: &JsonValue, path: &[&str]) -> Option<String> {
655 let mut current = record;
656 for key in path {
657 current = current.get(*key)?;
658 }
659 current
660 .as_str()
661 .map(str::trim)
662 .filter(|value| !value.is_empty())
663 .map(ToOwned::to_owned)
664}
665
666fn i64_field(record: &JsonValue, key: &str) -> Option<i64> {
667 record.get(key).and_then(|value| value.as_i64())
668}
669
670fn u64_field(record: &JsonValue, key: &str) -> Option<u64> {
671 record.get(key).and_then(|value| value.as_u64())
672}
673
674fn reasoning_tokens(usage: &UsageMetrics) -> i64 {
675 usage.reasoning_output_tokens_total().max(0)
676}
677
678fn cache_creation_tokens(usage: &UsageMetrics) -> i64 {
679 usage.cache_creation_tokens_total().max(0)
680}
681
682fn total_tokens(usage: &UsageMetrics, accounting: CacheInputAccounting) -> i64 {
683 if usage.total_tokens > 0 {
684 usage.total_tokens
685 } else {
686 let breakdown = usage.cache_usage_breakdown(accounting);
687 breakdown
688 .effective_input_tokens
689 .saturating_add(usage.output_tokens.max(0))
690 .saturating_add(breakdown.cache_read_input_tokens)
691 .saturating_add(breakdown.cache_creation_input_tokens)
692 }
693}
694
695fn format_ms(value: u64) -> String {
696 format!("{value}ms")
697}
698
699fn format_optional_ms(value: Option<u64>) -> String {
700 value.map(format_ms).unwrap_or_else(|| "-".to_string())
701}
702
703fn format_optional_speed(value: Option<f64>) -> String {
704 value
705 .map(|speed| format!("{speed:.2} tok/s"))
706 .unwrap_or_else(|| "-".to_string())
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712 use serde_json::json;
713
714 #[test]
715 fn display_lines_include_route_model_fast_cache_and_speed() {
716 let record = json!({
717 "timestamp_ms": 123,
718 "service": "codex",
719 "method": "POST",
720 "path": "/v1/responses",
721 "status_code": 200,
722 "duration_ms": 2000,
723 "ttfb_ms": 500,
724 "provider_id": "relay",
725 "endpoint_id": "default",
726 "provider_endpoint_key": "codex/relay/default",
727 "service_tier": { "effective": "priority" },
728 "usage": {
729 "input_tokens": 1000,
730 "output_tokens": 30,
731 "cache_read_input_tokens": 10,
732 "cache_creation_5m_input_tokens": 5,
733 "reasoning_output_tokens": 7,
734 "total_tokens": 1045
735 },
736 "retry": {
737 "route_attempts": [
738 { "decision": "completed", "model": "gpt-5" }
739 ]
740 }
741 });
742
743 let lines = format_request_log_record_lines(&record);
744
745 assert!(lines[0].contains("endpoint=codex/relay/default"));
746 assert!(lines[0].contains("station=-"));
747 assert!(lines[0].contains("model=gpt-5"));
748 assert!(lines[0].contains("tier=priority(fast)"));
749 assert!(lines[1].contains("output_speed=20.00 tok/s"));
750 assert!(lines[1].contains("cost="));
751 assert!(lines[2].contains("cache_read=10"));
752 assert!(lines[2].contains("cache_create=5"));
753 assert!(lines[2].contains("reasoning=7"));
754 }
755
756 #[test]
757 fn request_model_reads_legacy_retry_chain_model() {
758 let record = json!({
759 "retry": {
760 "upstream_chain": [
761 "main:https://relay.example/v1 (idx=0) status=200 class=- model=gpt-5.4-mini"
762 ]
763 }
764 });
765
766 assert_eq!(request_model(&record).as_deref(), Some("gpt-5.4-mini"));
767 }
768
769 #[test]
770 fn usage_aggregate_summary_includes_cache_and_average_duration() {
771 let mut aggregate = RequestUsageAggregate::default();
772 aggregate.record(
773 200,
774 Some(&UsageMetrics {
775 input_tokens: 10,
776 output_tokens: 5,
777 cache_read_input_tokens: 3,
778 cache_creation_1h_input_tokens: 2,
779 reasoning_output_tokens: 4,
780 ..UsageMetrics::default()
781 }),
782 CacheInputAccounting::default(),
783 );
784 aggregate.record(400, None, CacheInputAccounting::default());
785
786 assert_eq!(
787 aggregate.summary_line("main"),
788 "main | 2 | 10 | 5 | 3 | 2 | 4 | 20 | 300"
789 );
790 }
791
792 #[test]
793 fn filters_match_route_model_fast_retry_and_status() {
794 let record = json!({
795 "session_id": "sid-abc",
796 "station_name": "main-station",
797 "provider_id": "relay-one",
798 "status_code": 429,
799 "service_tier": { "actual": "priority" },
800 "observability": {
801 "attempt_count": 2,
802 "retried": true
803 },
804 "retry": {
805 "route_attempts": [
806 { "decision": "failed_status", "model": "gpt-5.4-high" },
807 { "decision": "completed", "model": "gpt-5.4-high" }
808 ]
809 }
810 });
811
812 let filters = RequestLogFilters {
813 session: Some("abc".to_string()),
814 model: Some("5.4".to_string()),
815 station: Some("main".to_string()),
816 provider: Some("relay".to_string()),
817 status_min: Some(400),
818 status_max: Some(499),
819 fast: true,
820 retried: true,
821 };
822
823 assert!(filters.matches(&record));
824 }
825
826 #[test]
827 fn filters_reject_nonmatching_model() {
828 let record = json!({
829 "model": "gpt-5.4-high",
830 "status_code": 200
831 });
832 let filters = RequestLogFilters {
833 model: Some("mini".to_string()),
834 ..RequestLogFilters::default()
835 };
836
837 assert!(!filters.matches(&record));
838 }
839
840 #[test]
841 fn tail_keeps_invalid_raw_lines_but_summary_ignores_them() {
842 let lines = [
843 RequestLogLine::from_raw(
844 r#"{"station_name":"a","duration_ms":100,"usage":{"total_tokens":7}}"#,
845 ),
846 RequestLogLine::from_raw("not-json"),
847 RequestLogLine::from_raw(
848 r#"{"station_name":"b","duration_ms":200,"usage":{"total_tokens":3}}"#,
849 ),
850 ];
851
852 assert!(!lines[1].is_valid_json());
853 let rows = summarize_request_log_lines(
854 lines.iter(),
855 RequestUsageSummaryGroup::Station,
856 &RequestLogFilters::default(),
857 10,
858 );
859
860 assert_eq!(rows.len(), 2);
861 assert_eq!(rows[0].group_value, "a");
862 assert_eq!(rows[0].aggregate.total_tokens, 7);
863 assert_eq!(rows[1].group_value, "b");
864 assert_eq!(rows[1].aggregate.total_tokens, 3);
865 }
866
867 #[test]
868 fn summary_can_group_by_provider_model_or_session_with_filters() {
869 let lines = [
870 RequestLogLine::from_raw(
871 r#"{"session_id":"sid-a","station_name":"s1","provider_id":"p1","model":"gpt-5","status_code":200,"usage":{"total_tokens":7}}"#,
872 ),
873 RequestLogLine::from_raw(
874 r#"{"session_id":"sid-b","station_name":"s1","provider_id":"p1","model":"gpt-5.4","status_code":429,"usage":{"total_tokens":11}}"#,
875 ),
876 RequestLogLine::from_raw(
877 r#"{"session_id":"sid-b","station_name":"s2","provider_id":"p2","model":"gpt-5.4","status_code":200,"usage":{"total_tokens":3}}"#,
878 ),
879 ];
880
881 let provider_rows = summarize_request_log_lines(
882 lines.iter(),
883 RequestUsageSummaryGroup::Provider,
884 &RequestLogFilters::default(),
885 10,
886 );
887 assert_eq!(provider_rows[0].group_value, "p1");
888 assert_eq!(provider_rows[0].aggregate.total_tokens, 18);
889 assert_eq!(provider_rows[1].group_value, "p2");
890
891 let model_rows = summarize_request_log_lines(
892 lines.iter(),
893 RequestUsageSummaryGroup::Model,
894 &RequestLogFilters {
895 status_min: Some(400),
896 ..RequestLogFilters::default()
897 },
898 10,
899 );
900 assert_eq!(model_rows.len(), 1);
901 assert_eq!(model_rows[0].group_value, "gpt-5.4");
902 assert_eq!(model_rows[0].aggregate.total_tokens, 11);
903
904 let session_rows = summarize_request_log_lines(
905 lines.iter(),
906 RequestUsageSummaryGroup::Session,
907 &RequestLogFilters::default(),
908 10,
909 );
910 assert_eq!(session_rows[0].group_value, "sid-b");
911 assert_eq!(session_rows[0].aggregate.requests, 2);
912 }
913
914 #[test]
915 fn request_log_record_projects_to_finished_request_for_ui_reuse() {
916 let record = json!({
917 "timestamp_ms": 1234,
918 "request_id": 42,
919 "trace_id": "codex-42",
920 "service": "codex",
921 "method": "POST",
922 "path": "/v1/responses",
923 "status_code": 200,
924 "duration_ms": 1500,
925 "ttfb_ms": 500,
926 "station_name": "primary",
927 "provider_id": "relay",
928 "upstream_base_url": "https://relay.example/v1",
929 "session_id": "sid-a",
930 "reasoning_effort": "medium",
931 "service_tier": { "actual": "priority" },
932 "usage": {
933 "input_tokens": 100,
934 "output_tokens": 50,
935 "total_tokens": 150
936 },
937 "retry": {
938 "attempts": 2,
939 "upstream_chain": [
940 "primary:https://relay.example/v1 (idx=0) status=429 class=rate_limit model=gpt-5.4",
941 "primary:https://relay.example/v1 (idx=1) status=200 class=- model=gpt-5.4"
942 ]
943 }
944 });
945
946 let request =
947 finished_request_from_request_log_record(&record).expect("finished request projection");
948
949 assert_eq!(request.id, 42);
950 assert_eq!(request.trace_id.as_deref(), Some("codex-42"));
951 assert_eq!(request.session_id.as_deref(), Some("sid-a"));
952 assert_eq!(request.model.as_deref(), Some("gpt-5.4"));
953 assert_eq!(request.service_tier.as_deref(), Some("priority"));
954 assert!(request.is_fast_mode());
955 assert_eq!(request.attempt_count(), 2);
956 assert_eq!(request.output_tokens_per_second(), Some(50.0));
957 assert_eq!(request.ended_at_ms, 1234);
958 }
959}