1use super::reports::{RateLimitParseDiagnostics, RateLimitSample, SourceSpan};
2use chrono::{DateTime, TimeZone, Utc};
3use serde_json::{Map, Value};
4
5#[derive(Clone, Debug, Default)]
6pub struct RateLimitLineContext<'a> {
7 pub session_id: &'a str,
8 pub account_id: Option<&'a str>,
9 pub source: Option<SourceSpan>,
10}
11
12pub fn parse_rate_limit_line(
13 line: &str,
14 context: RateLimitLineContext<'_>,
15 diagnostics: &mut RateLimitParseDiagnostics,
16) -> Vec<RateLimitSample> {
17 let value = match serde_json::from_str::<Value>(line) {
18 Ok(value) => value,
19 Err(_) => {
20 diagnostics.invalid_json_lines += 1;
21 return Vec::new();
22 }
23 };
24
25 let Some(event) = value.as_object() else {
26 return Vec::new();
27 };
28 if string_field(event, "type").as_deref() != Some("event_msg") {
29 return Vec::new();
30 }
31
32 let Some(payload) = object_field(event, "payload") else {
33 diagnostics.missing_rate_limits += 1;
34 return Vec::new();
35 };
36 let Some(rate_limits_value) = payload.get("rate_limits") else {
37 diagnostics.missing_rate_limits += 1;
38 return Vec::new();
39 };
40
41 diagnostics.rate_limit_events += 1;
42 if rate_limits_value.is_null() {
43 diagnostics.null_rate_limits += 1;
44 return Vec::new();
45 }
46
47 let Some(rate_limits) = rate_limits_value.as_object() else {
48 diagnostics.missing_windows += 1;
49 return Vec::new();
50 };
51 let Some(timestamp) = event.get("timestamp").and_then(value_to_utc) else {
52 diagnostics.missing_timestamps += 1;
53 return Vec::new();
54 };
55
56 count_unknown_windows(rate_limits, diagnostics);
57
58 let plan_type = string_field(rate_limits, "plan_type");
59 let limit_id = string_field(rate_limits, "limit_id");
60 let mut samples = Vec::new();
61
62 for window_name in ["primary", "secondary"] {
63 let Some(window_value) = rate_limits.get(window_name) else {
64 continue;
65 };
66 let Some(window) = window_value.as_object() else {
67 diagnostics.invalid_window_minutes += 1;
68 continue;
69 };
70 if let Some(sample) = parse_window_sample(
71 timestamp,
72 window_name,
73 window,
74 &context,
75 plan_type.as_deref(),
76 limit_id.as_deref(),
77 diagnostics,
78 ) {
79 samples.push(sample);
80 }
81 }
82
83 if samples.is_empty() {
84 diagnostics.missing_windows += 1;
85 }
86 diagnostics.included_samples += samples.len() as i64;
87 samples
88}
89
90fn parse_window_sample(
91 timestamp: DateTime<Utc>,
92 window_name: &str,
93 window: &Map<String, Value>,
94 context: &RateLimitLineContext<'_>,
95 plan_type: Option<&str>,
96 limit_id: Option<&str>,
97 diagnostics: &mut RateLimitParseDiagnostics,
98) -> Option<RateLimitSample> {
99 let Some(window_minutes) = window.get("window_minutes").and_then(value_to_i64) else {
100 diagnostics.invalid_window_minutes += 1;
101 return None;
102 };
103 if window_minutes <= 0 {
104 diagnostics.invalid_window_minutes += 1;
105 return None;
106 }
107 let Some(used_percent) = window.get("used_percent").and_then(value_to_f64) else {
108 diagnostics.invalid_used_percent += 1;
109 return None;
110 };
111 let Some(resets_at) = window.get("resets_at").and_then(value_to_unix_seconds) else {
112 diagnostics.invalid_resets_at += 1;
113 return None;
114 };
115
116 if !(0.0..=100.0).contains(&used_percent) {
117 diagnostics.out_of_range_percent += 1;
118 }
119
120 Some(RateLimitSample {
121 timestamp,
122 session_id: context.session_id.to_string(),
123 account_id: context.account_id.map(str::to_string),
124 plan_type: plan_type.map(str::to_string),
125 limit_id: limit_id.map(str::to_string),
126 window: window_label(window_name, window_minutes),
127 window_minutes,
128 used_percent,
129 remaining_percent: 100.0 - used_percent,
130 resets_at,
131 source: context.source.clone(),
132 })
133}
134
135fn count_unknown_windows(
136 rate_limits: &Map<String, Value>,
137 diagnostics: &mut RateLimitParseDiagnostics,
138) {
139 for (key, value) in rate_limits {
140 if matches!(
141 key.as_str(),
142 "primary" | "secondary" | "plan_type" | "limit_id"
143 ) {
144 continue;
145 }
146 if value.is_object() {
147 diagnostics.unknown_windows += 1;
148 }
149 }
150}
151
152fn object_field<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a Map<String, Value>> {
153 object.get(key).and_then(Value::as_object)
154}
155
156fn string_field(object: &Map<String, Value>, key: &str) -> Option<String> {
157 object
158 .get(key)
159 .and_then(Value::as_str)
160 .map(str::trim)
161 .filter(|value| !value.is_empty())
162 .map(str::to_string)
163}
164
165fn value_to_f64(value: &Value) -> Option<f64> {
166 let parsed = match value {
167 Value::Number(number) => number.as_f64(),
168 Value::String(value) => value.trim().parse::<f64>().ok(),
169 _ => None,
170 }?;
171
172 parsed.is_finite().then_some(parsed)
173}
174
175fn value_to_i64(value: &Value) -> Option<i64> {
176 match value {
177 Value::Number(number) => number
178 .as_i64()
179 .or_else(|| number.as_u64().and_then(|value| i64::try_from(value).ok()))
180 .or_else(|| {
181 number.as_f64().and_then(|value| {
182 if value.is_finite()
183 && value.fract() == 0.0
184 && value >= i64::MIN as f64
185 && value <= i64::MAX as f64
186 {
187 Some(value as i64)
188 } else {
189 None
190 }
191 })
192 }),
193 Value::String(value) => value.trim().parse::<i64>().ok(),
194 _ => None,
195 }
196}
197
198fn value_to_unix_seconds(value: &Value) -> Option<DateTime<Utc>> {
199 let seconds = value_to_i64(value)?;
200 Utc.timestamp_opt(seconds, 0).single()
201}
202
203fn value_to_utc(value: &Value) -> Option<DateTime<Utc>> {
204 match value {
205 Value::String(value) => DateTime::parse_from_rfc3339(value.trim())
206 .ok()
207 .map(|timestamp| timestamp.with_timezone(&Utc)),
208 Value::Number(_) => {
209 let millis = value_to_i64(value)?;
210 Utc.timestamp_millis_opt(millis).single()
211 }
212 _ => None,
213 }
214}
215
216fn window_label(window_name: &str, window_minutes: i64) -> String {
217 match window_minutes {
218 300 => "5h".to_string(),
219 10080 => "7d".to_string(),
220 _ => window_name.to_string(),
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use serde_json::json;
228
229 #[test]
230 fn parses_primary_and_secondary_rate_limit_samples() {
231 let line = json!({
232 "timestamp": "2026-05-10T09:00:01.500Z",
233 "type": "event_msg",
234 "payload": {
235 "rate_limits": {
236 "primary": {
237 "window_minutes": 300,
238 "used_percent": 42.5,
239 "resets_at": 1778421600
240 },
241 "secondary": {
242 "window_minutes": 10080,
243 "used_percent": 84.0,
244 "resets_at": 1778490000
245 },
246 "plan_type": "pro",
247 "limit_id": "fixture-alpha-pre-reset"
248 }
249 }
250 })
251 .to_string();
252 let mut diagnostics = RateLimitParseDiagnostics::default();
253
254 let samples = parse_rate_limit_line(
255 &line,
256 RateLimitLineContext {
257 session_id: "rust-run-session-alpha",
258 account_id: Some("account-fixture"),
259 source: Some(SourceSpan {
260 path: "/tmp/session.jsonl".to_string(),
261 line_number: 3,
262 }),
263 },
264 &mut diagnostics,
265 );
266
267 assert_eq!(samples.len(), 2);
268 assert_eq!(diagnostics.rate_limit_events, 1);
269 assert_eq!(diagnostics.included_samples, 2);
270
271 let primary = &samples[0];
272 assert_eq!(primary.session_id, "rust-run-session-alpha");
273 assert_eq!(primary.account_id.as_deref(), Some("account-fixture"));
274 assert_eq!(primary.plan_type.as_deref(), Some("pro"));
275 assert_eq!(primary.limit_id.as_deref(), Some("fixture-alpha-pre-reset"));
276 assert_eq!(primary.window, "5h");
277 assert_eq!(primary.window_minutes, 300);
278 assert_eq!(primary.used_percent, 42.5);
279 assert_eq!(primary.remaining_percent, 57.5);
280 assert_eq!(
281 primary.resets_at,
282 Utc.timestamp_opt(1778421600, 0).single().unwrap()
283 );
284 assert_eq!(
285 primary.source.as_ref().map(|source| source.path.as_str()),
286 Some("/tmp/session.jsonl")
287 );
288 assert_eq!(
289 primary.source.as_ref().map(|source| source.line_number),
290 Some(3)
291 );
292
293 let secondary = &samples[1];
294 assert_eq!(secondary.window, "7d");
295 assert_eq!(secondary.window_minutes, 10080);
296 assert_eq!(secondary.used_percent, 84.0);
297
298 let serialized = serde_json::to_value(primary).expect("sample json");
299 assert!(serialized.get("source").is_none());
300 assert!(serialized.get("sourcePath").is_none());
301 assert!(serialized.get("lineNumber").is_none());
302 for key in [
303 "timestamp",
304 "sessionId",
305 "accountId",
306 "planType",
307 "limitId",
308 "window",
309 "windowMinutes",
310 "usedPercent",
311 "remainingPercent",
312 "resetsAt",
313 ] {
314 assert!(serialized.get(key).is_some(), "missing key {key}");
315 }
316 }
317
318 #[test]
319 fn parses_primary_only_sample_without_missing_window_diagnostic() {
320 let line = json!({
321 "timestamp": "2026-05-12T13:05:00.000Z",
322 "type": "event_msg",
323 "payload": {
324 "rate_limits": {
325 "primary": {
326 "window_minutes": 300,
327 "used_percent": 18.0,
328 "resets_at": 1778605200
329 },
330 "plan_type": "plus"
331 }
332 }
333 })
334 .to_string();
335 let mut diagnostics = RateLimitParseDiagnostics::default();
336
337 let samples = parse_rate_limit_line(
338 &line,
339 RateLimitLineContext {
340 session_id: "rust-run-session-delta",
341 account_id: None,
342 source: None,
343 },
344 &mut diagnostics,
345 );
346
347 assert_eq!(samples.len(), 1);
348 assert_eq!(samples[0].account_id, None);
349 assert_eq!(samples[0].plan_type.as_deref(), Some("plus"));
350 assert_eq!(diagnostics.missing_windows, 0);
351 }
352
353 #[test]
354 fn null_rate_limits_are_counted_and_skipped() {
355 let line = json!({
356 "timestamp": "2026-05-12T12:10:00.000Z",
357 "type": "event_msg",
358 "payload": {
359 "rate_limits": null
360 }
361 })
362 .to_string();
363 let mut diagnostics = RateLimitParseDiagnostics::default();
364
365 let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
366
367 assert!(samples.is_empty());
368 assert_eq!(diagnostics.rate_limit_events, 1);
369 assert_eq!(diagnostics.null_rate_limits, 1);
370 }
371
372 #[test]
373 fn missing_rate_limits_are_counted_on_event_payloads() {
374 let line = json!({
375 "timestamp": "2026-05-12T12:10:00.000Z",
376 "type": "event_msg",
377 "payload": {}
378 })
379 .to_string();
380 let mut diagnostics = RateLimitParseDiagnostics::default();
381
382 let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
383
384 assert!(samples.is_empty());
385 assert_eq!(diagnostics.missing_rate_limits, 1);
386 assert_eq!(diagnostics.rate_limit_events, 0);
387 }
388
389 #[test]
390 fn invalid_json_counts_but_non_object_json_and_non_event_lines_are_skipped() {
391 let mut diagnostics = RateLimitParseDiagnostics::default();
392
393 assert!(parse_rate_limit_line("{not-json", default_context(), &mut diagnostics).is_empty());
394 assert!(parse_rate_limit_line("[]", default_context(), &mut diagnostics).is_empty());
395 assert!(parse_rate_limit_line(
396 &json!({"type": "session_meta"}).to_string(),
397 default_context(),
398 &mut diagnostics
399 )
400 .is_empty());
401
402 assert_eq!(diagnostics.invalid_json_lines, 1);
403 assert_eq!(diagnostics.rate_limit_events, 0);
404 }
405
406 #[test]
407 fn invalid_window_fields_are_counted_without_samples() {
408 let line = json!({
409 "timestamp": "2026-05-12T12:10:00.000Z",
410 "type": "event_msg",
411 "payload": {
412 "rate_limits": {
413 "primary": {
414 "used_percent": 10.0,
415 "resets_at": 1778605200
416 },
417 "secondary": {
418 "window_minutes": 10080,
419 "used_percent": "not-percent",
420 "resets_at": "not-reset"
421 },
422 "tertiary": {
423 "window_minutes": 60,
424 "used_percent": 1.0,
425 "resets_at": 1778605200
426 }
427 }
428 }
429 })
430 .to_string();
431 let mut diagnostics = RateLimitParseDiagnostics::default();
432
433 let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
434
435 assert!(samples.is_empty());
436 assert_eq!(diagnostics.invalid_window_minutes, 1);
437 assert_eq!(diagnostics.invalid_used_percent, 1);
438 assert_eq!(diagnostics.invalid_resets_at, 0);
439 assert_eq!(diagnostics.unknown_windows, 1);
440 assert_eq!(diagnostics.missing_windows, 1);
441 }
442
443 #[test]
444 fn zero_window_minutes_are_counted_without_samples() {
445 let line = json!({
446 "timestamp": "2026-05-12T12:10:00.000Z",
447 "type": "event_msg",
448 "payload": {
449 "rate_limits": {
450 "primary": {
451 "window_minutes": 0,
452 "used_percent": 10.0,
453 "resets_at": 1778605200
454 }
455 }
456 }
457 })
458 .to_string();
459 let mut diagnostics = RateLimitParseDiagnostics::default();
460
461 let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
462
463 assert!(samples.is_empty());
464 assert_eq!(diagnostics.invalid_window_minutes, 1);
465 assert_eq!(diagnostics.invalid_used_percent, 0);
466 assert_eq!(diagnostics.invalid_resets_at, 0);
467 }
468
469 #[test]
470 fn invalid_resets_at_is_counted_after_valid_percent() {
471 let line = json!({
472 "timestamp": "2026-05-12T12:10:00.000Z",
473 "type": "event_msg",
474 "payload": {
475 "rate_limits": {
476 "primary": {
477 "window_minutes": 300,
478 "used_percent": 10.0,
479 "resets_at": "not-reset"
480 }
481 }
482 }
483 })
484 .to_string();
485 let mut diagnostics = RateLimitParseDiagnostics::default();
486
487 let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
488
489 assert!(samples.is_empty());
490 assert_eq!(diagnostics.invalid_resets_at, 1);
491 assert_eq!(diagnostics.missing_windows, 1);
492 }
493
494 #[test]
495 fn out_of_range_percent_is_preserved_and_flagged() {
496 let line = json!({
497 "timestamp": "2026-05-12T12:10:00.000Z",
498 "type": "event_msg",
499 "payload": {
500 "rate_limits": {
501 "primary": {
502 "window_minutes": "300",
503 "used_percent": 125.0,
504 "resets_at": "1778605200"
505 }
506 }
507 }
508 })
509 .to_string();
510 let mut diagnostics = RateLimitParseDiagnostics::default();
511
512 let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
513
514 assert_eq!(samples.len(), 1);
515 assert_eq!(samples[0].used_percent, 125.0);
516 assert_eq!(samples[0].remaining_percent, -25.0);
517 assert_eq!(diagnostics.out_of_range_percent, 1);
518 }
519
520 #[test]
521 fn missing_or_invalid_timestamp_skips_samples() {
522 let line = json!({
523 "timestamp": "not-a-date",
524 "type": "event_msg",
525 "payload": {
526 "rate_limits": {
527 "primary": {
528 "window_minutes": 300,
529 "used_percent": 10.0,
530 "resets_at": 1778605200
531 }
532 }
533 }
534 })
535 .to_string();
536 let mut diagnostics = RateLimitParseDiagnostics::default();
537
538 let samples = parse_rate_limit_line(&line, default_context(), &mut diagnostics);
539
540 assert!(samples.is_empty());
541 assert_eq!(diagnostics.rate_limit_events, 1);
542 assert_eq!(diagnostics.missing_timestamps, 1);
543 }
544
545 fn default_context<'a>() -> RateLimitLineContext<'a> {
546 RateLimitLineContext {
547 session_id: "session-fixture",
548 account_id: None,
549 source: None,
550 }
551 }
552}