Skip to main content

codex_ops/limits/
events.rs

1use super::reports::{RateLimitParseDiagnostics, RateLimitSample, SourceSpan};
2use chrono::{DateTime, TimeZone, Utc};
3use serde_json::{Map, Value};
4
5#[derive(Clone, Debug, Default)]
6pub struct RateLimitLineContext<'a> {
7    pub session_id: &'a str,
8    pub account_id: Option<&'a str>,
9    pub source: Option<SourceSpan>,
10}
11
12pub fn parse_rate_limit_line(
13    line: &str,
14    context: RateLimitLineContext<'_>,
15    diagnostics: &mut RateLimitParseDiagnostics,
16) -> Vec<RateLimitSample> {
17    let value = match serde_json::from_str::<Value>(line) {
18        Ok(value) => value,
19        Err(_) => {
20            diagnostics.invalid_json_lines += 1;
21            return Vec::new();
22        }
23    };
24
25    let Some(event) = value.as_object() else {
26        return Vec::new();
27    };
28    if string_field(event, "type").as_deref() != Some("event_msg") {
29        return Vec::new();
30    }
31
32    let Some(payload) = object_field(event, "payload") else {
33        diagnostics.missing_rate_limits += 1;
34        return Vec::new();
35    };
36    let Some(rate_limits_value) = payload.get("rate_limits") else {
37        diagnostics.missing_rate_limits += 1;
38        return Vec::new();
39    };
40
41    diagnostics.rate_limit_events += 1;
42    if rate_limits_value.is_null() {
43        diagnostics.null_rate_limits += 1;
44        return Vec::new();
45    }
46
47    let Some(rate_limits) = rate_limits_value.as_object() else {
48        diagnostics.missing_windows += 1;
49        return Vec::new();
50    };
51    let Some(timestamp) = event.get("timestamp").and_then(value_to_utc) else {
52        diagnostics.missing_timestamps += 1;
53        return Vec::new();
54    };
55
56    count_unknown_windows(rate_limits, diagnostics);
57
58    let plan_type = string_field(rate_limits, "plan_type");
59    let limit_id = string_field(rate_limits, "limit_id");
60    let mut samples = Vec::new();
61
62    for window_name in ["primary", "secondary"] {
63        let Some(window_value) = rate_limits.get(window_name) else {
64            continue;
65        };
66        let Some(window) = window_value.as_object() else {
67            diagnostics.invalid_window_minutes += 1;
68            continue;
69        };
70        if let Some(sample) = parse_window_sample(
71            timestamp,
72            window_name,
73            window,
74            &context,
75            plan_type.as_deref(),
76            limit_id.as_deref(),
77            diagnostics,
78        ) {
79            samples.push(sample);
80        }
81    }
82
83    if samples.is_empty() {
84        diagnostics.missing_windows += 1;
85    }
86    diagnostics.included_samples += samples.len() as i64;
87    samples
88}
89
90fn parse_window_sample(
91    timestamp: DateTime<Utc>,
92    window_name: &str,
93    window: &Map<String, Value>,
94    context: &RateLimitLineContext<'_>,
95    plan_type: Option<&str>,
96    limit_id: Option<&str>,
97    diagnostics: &mut RateLimitParseDiagnostics,
98) -> Option<RateLimitSample> {
99    let Some(window_minutes) = window.get("window_minutes").and_then(value_to_i64) else {
100        diagnostics.invalid_window_minutes += 1;
101        return None;
102    };
103    if window_minutes <= 0 {
104        diagnostics.invalid_window_minutes += 1;
105        return None;
106    }
107    let Some(used_percent) = window.get("used_percent").and_then(value_to_f64) else {
108        diagnostics.invalid_used_percent += 1;
109        return None;
110    };
111    let Some(resets_at) = window.get("resets_at").and_then(value_to_unix_seconds) else {
112        diagnostics.invalid_resets_at += 1;
113        return None;
114    };
115
116    if !(0.0..=100.0).contains(&used_percent) {
117        diagnostics.out_of_range_percent += 1;
118    }
119
120    Some(RateLimitSample {
121        timestamp,
122        session_id: context.session_id.to_string(),
123        account_id: context.account_id.map(str::to_string),
124        plan_type: plan_type.map(str::to_string),
125        limit_id: limit_id.map(str::to_string),
126        window: window_label(window_name, window_minutes),
127        window_minutes,
128        used_percent,
129        remaining_percent: 100.0 - used_percent,
130        resets_at,
131        source: context.source.clone(),
132    })
133}
134
135fn count_unknown_windows(
136    rate_limits: &Map<String, Value>,
137    diagnostics: &mut RateLimitParseDiagnostics,
138) {
139    for (key, value) in rate_limits {
140        if matches!(
141            key.as_str(),
142            "primary" | "secondary" | "plan_type" | "limit_id"
143        ) {
144            continue;
145        }
146        if value.is_object() {
147            diagnostics.unknown_windows += 1;
148        }
149    }
150}
151
152fn object_field<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a Map<String, Value>> {
153    object.get(key).and_then(Value::as_object)
154}
155
156fn string_field(object: &Map<String, Value>, key: &str) -> Option<String> {
157    object
158        .get(key)
159        .and_then(Value::as_str)
160        .map(str::trim)
161        .filter(|value| !value.is_empty())
162        .map(str::to_string)
163}
164
165fn value_to_f64(value: &Value) -> Option<f64> {
166    let parsed = match value {
167        Value::Number(number) => number.as_f64(),
168        Value::String(value) => value.trim().parse::<f64>().ok(),
169        _ => None,
170    }?;
171
172    parsed.is_finite().then_some(parsed)
173}
174
175fn value_to_i64(value: &Value) -> Option<i64> {
176    match value {
177        Value::Number(number) => number
178            .as_i64()
179            .or_else(|| number.as_u64().and_then(|value| i64::try_from(value).ok()))
180            .or_else(|| {
181                number.as_f64().and_then(|value| {
182                    if value.is_finite()
183                        && value.fract() == 0.0
184                        && value >= i64::MIN as f64
185                        && value <= i64::MAX as f64
186                    {
187                        Some(value as i64)
188                    } else {
189                        None
190                    }
191                })
192            }),
193        Value::String(value) => value.trim().parse::<i64>().ok(),
194        _ => None,
195    }
196}
197
198fn value_to_unix_seconds(value: &Value) -> Option<DateTime<Utc>> {
199    let seconds = value_to_i64(value)?;
200    Utc.timestamp_opt(seconds, 0).single()
201}
202
203fn value_to_utc(value: &Value) -> Option<DateTime<Utc>> {
204    match value {
205        Value::String(value) => DateTime::parse_from_rfc3339(value.trim())
206            .ok()
207            .map(|timestamp| timestamp.with_timezone(&Utc)),
208        Value::Number(_) => {
209            let millis = value_to_i64(value)?;
210            Utc.timestamp_millis_opt(millis).single()
211        }
212        _ => None,
213    }
214}
215
216fn window_label(window_name: &str, window_minutes: i64) -> String {
217    match window_minutes {
218        300 => "5h".to_string(),
219        10080 => "7d".to_string(),
220        _ => window_name.to_string(),
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use serde_json::json;
228
229    #[test]
230    fn parses_primary_and_secondary_rate_limit_samples() {
231        let line = json!({
232            "timestamp": "2026-05-10T09:00:01.500Z",
233            "type": "event_msg",
234            "payload": {
235                "rate_limits": {
236                    "primary": {
237                        "window_minutes": 300,
238                        "used_percent": 42.5,
239                        "resets_at": 1778421600
240                    },
241                    "secondary": {
242                        "window_minutes": 10080,
243                        "used_percent": 84.0,
244                        "resets_at": 1778490000
245                    },
246                    "plan_type": "pro",
247                    "limit_id": "fixture-alpha-pre-reset"
248                }
249            }
250        })
251        .to_string();
252        let mut diagnostics = RateLimitParseDiagnostics::default();
253
254        let samples = parse_rate_limit_line(
255            &line,
256            RateLimitLineContext {
257                session_id: "rust-run-session-alpha",
258                account_id: Some("account-fixture"),
259                source: Some(SourceSpan {
260                    path: "/tmp/session.jsonl".to_string(),
261                    line_number: 3,
262                }),
263            },
264            &mut diagnostics,
265        );
266
267        assert_eq!(samples.len(), 2);
268        assert_eq!(diagnostics.rate_limit_events, 1);
269        assert_eq!(diagnostics.included_samples, 2);
270
271        let primary = &samples[0];
272        assert_eq!(primary.session_id, "rust-run-session-alpha");
273        assert_eq!(primary.account_id.as_deref(), Some("account-fixture"));
274        assert_eq!(primary.plan_type.as_deref(), Some("pro"));
275        assert_eq!(primary.limit_id.as_deref(), Some("fixture-alpha-pre-reset"));
276        assert_eq!(primary.window, "5h");
277        assert_eq!(primary.window_minutes, 300);
278        assert_eq!(primary.used_percent, 42.5);
279        assert_eq!(primary.remaining_percent, 57.5);
280        assert_eq!(
281            primary.resets_at,
282            Utc.timestamp_opt(1778421600, 0).single().unwrap()
283        );
284        assert_eq!(
285            primary.source.as_ref().map(|source| source.path.as_str()),
286            Some("/tmp/session.jsonl")
287        );
288        assert_eq!(
289            primary.source.as_ref().map(|source| source.line_number),
290            Some(3)
291        );
292
293        let secondary = &samples[1];
294        assert_eq!(secondary.window, "7d");
295        assert_eq!(secondary.window_minutes, 10080);
296        assert_eq!(secondary.used_percent, 84.0);
297
298        let serialized = serde_json::to_value(primary).expect("sample json");
299        assert!(serialized.get("source").is_none());
300        assert!(serialized.get("sourcePath").is_none());
301        assert!(serialized.get("lineNumber").is_none());
302        for key in [
303            "timestamp",
304            "sessionId",
305            "accountId",
306            "planType",
307            "limitId",
308            "window",
309            "windowMinutes",
310            "usedPercent",
311            "remainingPercent",
312            "resetsAt",
313        ] {
314            assert!(serialized.get(key).is_some(), "missing key {key}");
315        }
316    }
317
318    #[test]
319    fn parses_primary_only_sample_without_missing_window_diagnostic() {
320        let line = json!({
321            "timestamp": "2026-05-12T13:05:00.000Z",
322            "type": "event_msg",
323            "payload": {
324                "rate_limits": {
325                    "primary": {
326                        "window_minutes": 300,
327                        "used_percent": 18.0,
328                        "resets_at": 1778605200
329                    },
330                    "plan_type": "plus"
331                }
332            }
333        })
334        .to_string();
335        let mut diagnostics = RateLimitParseDiagnostics::default();
336
337        let samples = parse_rate_limit_line(
338            &line,
339            RateLimitLineContext {
340                session_id: "rust-run-session-delta",
341                account_id: None,
342                source: None,
343            },
344            &mut diagnostics,
345        );
346
347        assert_eq!(samples.len(), 1);
348        assert_eq!(samples[0].account_id, None);
349        assert_eq!(samples[0].plan_type.as_deref(), Some("plus"));
350        assert_eq!(diagnostics.missing_windows, 0);
351    }
352
353    #[test]
354    fn null_rate_limits_are_counted_and_skipped() {
355        let line = json!({
356            "timestamp": "2026-05-12T12:10:00.000Z",
357            "type": "event_msg",
358            "payload": {
359                "rate_limits": null
360            }
361        })
362        .to_string();
363        let mut diagnostics = RateLimitParseDiagnostics::default();
364
365        let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
366
367        assert!(samples.is_empty());
368        assert_eq!(diagnostics.rate_limit_events, 1);
369        assert_eq!(diagnostics.null_rate_limits, 1);
370    }
371
372    #[test]
373    fn missing_rate_limits_are_counted_on_event_payloads() {
374        let line = json!({
375            "timestamp": "2026-05-12T12:10:00.000Z",
376            "type": "event_msg",
377            "payload": {}
378        })
379        .to_string();
380        let mut diagnostics = RateLimitParseDiagnostics::default();
381
382        let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
383
384        assert!(samples.is_empty());
385        assert_eq!(diagnostics.missing_rate_limits, 1);
386        assert_eq!(diagnostics.rate_limit_events, 0);
387    }
388
389    #[test]
390    fn invalid_json_counts_but_non_object_json_and_non_event_lines_are_skipped() {
391        let mut diagnostics = RateLimitParseDiagnostics::default();
392
393        assert!(parse_rate_limit_line("{not-json", default_context(), &mut diagnostics).is_empty());
394        assert!(parse_rate_limit_line("[]", default_context(), &mut diagnostics).is_empty());
395        assert!(parse_rate_limit_line(
396            &json!({"type": "session_meta"}).to_string(),
397            default_context(),
398            &mut diagnostics
399        )
400        .is_empty());
401
402        assert_eq!(diagnostics.invalid_json_lines, 1);
403        assert_eq!(diagnostics.rate_limit_events, 0);
404    }
405
406    #[test]
407    fn invalid_window_fields_are_counted_without_samples() {
408        let line = json!({
409            "timestamp": "2026-05-12T12:10:00.000Z",
410            "type": "event_msg",
411            "payload": {
412                "rate_limits": {
413                    "primary": {
414                        "used_percent": 10.0,
415                        "resets_at": 1778605200
416                    },
417                    "secondary": {
418                        "window_minutes": 10080,
419                        "used_percent": "not-percent",
420                        "resets_at": "not-reset"
421                    },
422                    "tertiary": {
423                        "window_minutes": 60,
424                        "used_percent": 1.0,
425                        "resets_at": 1778605200
426                    }
427                }
428            }
429        })
430        .to_string();
431        let mut diagnostics = RateLimitParseDiagnostics::default();
432
433        let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
434
435        assert!(samples.is_empty());
436        assert_eq!(diagnostics.invalid_window_minutes, 1);
437        assert_eq!(diagnostics.invalid_used_percent, 1);
438        assert_eq!(diagnostics.invalid_resets_at, 0);
439        assert_eq!(diagnostics.unknown_windows, 1);
440        assert_eq!(diagnostics.missing_windows, 1);
441    }
442
443    #[test]
444    fn zero_window_minutes_are_counted_without_samples() {
445        let line = json!({
446            "timestamp": "2026-05-12T12:10:00.000Z",
447            "type": "event_msg",
448            "payload": {
449                "rate_limits": {
450                    "primary": {
451                        "window_minutes": 0,
452                        "used_percent": 10.0,
453                        "resets_at": 1778605200
454                    }
455                }
456            }
457        })
458        .to_string();
459        let mut diagnostics = RateLimitParseDiagnostics::default();
460
461        let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
462
463        assert!(samples.is_empty());
464        assert_eq!(diagnostics.invalid_window_minutes, 1);
465        assert_eq!(diagnostics.invalid_used_percent, 0);
466        assert_eq!(diagnostics.invalid_resets_at, 0);
467    }
468
469    #[test]
470    fn invalid_resets_at_is_counted_after_valid_percent() {
471        let line = json!({
472            "timestamp": "2026-05-12T12:10:00.000Z",
473            "type": "event_msg",
474            "payload": {
475                "rate_limits": {
476                    "primary": {
477                        "window_minutes": 300,
478                        "used_percent": 10.0,
479                        "resets_at": "not-reset"
480                    }
481                }
482            }
483        })
484        .to_string();
485        let mut diagnostics = RateLimitParseDiagnostics::default();
486
487        let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
488
489        assert!(samples.is_empty());
490        assert_eq!(diagnostics.invalid_resets_at, 1);
491        assert_eq!(diagnostics.missing_windows, 1);
492    }
493
494    #[test]
495    fn out_of_range_percent_is_preserved_and_flagged() {
496        let line = json!({
497            "timestamp": "2026-05-12T12:10:00.000Z",
498            "type": "event_msg",
499            "payload": {
500                "rate_limits": {
501                    "primary": {
502                        "window_minutes": "300",
503                        "used_percent": 125.0,
504                        "resets_at": "1778605200"
505                    }
506                }
507            }
508        })
509        .to_string();
510        let mut diagnostics = RateLimitParseDiagnostics::default();
511
512        let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
513
514        assert_eq!(samples.len(), 1);
515        assert_eq!(samples[0].used_percent, 125.0);
516        assert_eq!(samples[0].remaining_percent, -25.0);
517        assert_eq!(diagnostics.out_of_range_percent, 1);
518    }
519
520    #[test]
521    fn missing_or_invalid_timestamp_skips_samples() {
522        let line = json!({
523            "timestamp": "not-a-date",
524            "type": "event_msg",
525            "payload": {
526                "rate_limits": {
527                    "primary": {
528                        "window_minutes": 300,
529                        "used_percent": 10.0,
530                        "resets_at": 1778605200
531                    }
532                }
533            }
534        })
535        .to_string();
536        let mut diagnostics = RateLimitParseDiagnostics::default();
537
538        let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
539
540        assert!(samples.is_empty());
541        assert_eq!(diagnostics.rate_limit_events, 1);
542        assert_eq!(diagnostics.missing_timestamps, 1);
543    }
544
545    fn default_context<'a>() -> RateLimitLineContext<'a> {
546        RateLimitLineContext {
547            session_id: "session-fixture",
548            account_id: None,
549            source: None,
550        }
551    }
552}