1use super::events::{parse_rate_limit_line, RateLimitLineContext};
2use super::reports::{
3 RateLimitDiagnostics, RateLimitParseDiagnostics, RateLimitSample, RateLimitSamplesReadOptions,
4 RateLimitSamplesReport, SourceSpan,
5};
6use crate::account_history::{self, UsageAccountHistory};
7use crate::error::AppError;
8use crate::session_scan::{
9 prepare_session_scan, session_id_from_path, PreparedSessionFile, SessionScanDiagnostics,
10 SessionScanOptions, SESSION_READ_BUFFER_SIZE,
11};
12use crate::storage::path_to_string;
13use crate::time::DateRange;
14use chrono::{DateTime, TimeZone, Utc};
15use serde_json::Value;
16use std::fs::File;
17use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
18use std::path::Path;
19
20const RATE_LIMIT_FILE_READ_CONCURRENCY: i64 = 1;
21
22pub fn read_rate_limit_samples_report(
23 options: &RateLimitSamplesReadOptions,
24) -> Result<RateLimitSamplesReport, AppError> {
25 let account_history = match &options.account_history_file {
26 Some(path) => account_history::read_optional_usage_account_history(path)?,
27 None => None,
28 };
29 let range = DateRange {
30 start: options.start,
31 end: options.end,
32 };
33 let mut diagnostics =
34 RateLimitDiagnostics::new(RATE_LIMIT_FILE_READ_CONCURRENCY, options.scan_all_files);
35 let prepared = prepare_session_scan(
36 SessionScanOptions {
37 sessions_dir: &options.sessions_dir,
38 range,
39 scan_all_files: options.scan_all_files,
40 },
41 read_last_rate_limit_timestamp,
42 )?;
43 merge_session_scan(&mut diagnostics, &prepared.diagnostics);
44
45 let mut samples = Vec::new();
46 for file in &prepared.files {
47 let mut file_samples = read_rate_limit_samples_from_file(
48 file,
49 range,
50 account_history.as_ref(),
51 options,
52 &mut diagnostics,
53 )?;
54 samples.append(&mut file_samples);
55 }
56
57 samples.sort_by(|left, right| {
58 left.timestamp
59 .cmp(&right.timestamp)
60 .then_with(|| left.session_id.cmp(&right.session_id))
61 .then_with(|| left.window_minutes.cmp(&right.window_minutes))
62 .then_with(|| left.limit_id.cmp(&right.limit_id))
63 });
64
65 Ok(RateLimitSamplesReport {
66 start: options.start,
67 end: options.end,
68 sessions_dir: path_to_string(&options.sessions_dir),
69 samples,
70 diagnostics,
71 })
72}
73
74fn merge_session_scan(diagnostics: &mut RateLimitDiagnostics, session: &SessionScanDiagnostics) {
75 diagnostics.scanned_directories += session.scanned_directories;
76 diagnostics.skipped_directories += session.skipped_directories;
77 diagnostics.read_files += session.read_files;
78 diagnostics.skipped_files += session.skipped_files;
79 diagnostics.prefiltered_files += session.prefiltered_files;
80 diagnostics.tail_read_files += session.tail_read_files;
81 diagnostics.tail_read_hits += session.tail_read_hits;
82 diagnostics.mtime_read_files += session.mtime_read_files;
83 diagnostics.mtime_tail_hits += session.mtime_tail_hits;
84 diagnostics.mtime_read_hits += session.mtime_read_hits;
85 diagnostics.fork_files += session.fork_files;
86 diagnostics.fork_parent_missing += session.fork_parent_missing;
87 diagnostics.fork_replay_lines += session.fork_replay_lines;
88}
89
90fn read_rate_limit_samples_from_file(
91 session_file: &PreparedSessionFile,
92 range: DateRange,
93 account_history: Option<&UsageAccountHistory>,
94 options: &RateLimitSamplesReadOptions,
95 diagnostics: &mut RateLimitDiagnostics,
96) -> Result<Vec<RateLimitSample>, AppError> {
97 let path = &session_file.path;
98 let file_handle = File::open(path).map_err(|error| AppError::new(error.to_string()))?;
99 let mut reader = BufReader::with_capacity(SESSION_READ_BUFFER_SIZE, file_handle);
100 let mut line = String::new();
101 let mut line_number = 0_usize;
102 let mut session_id = session_file
103 .current_session_id
104 .clone()
105 .unwrap_or_else(|| session_id_from_path(path));
106 let file_path = path_to_string(path);
107 let mut file_has_rate_limits = false;
108 let mut file_has_token_count = false;
109 let mut samples = Vec::new();
110
111 loop {
112 line.clear();
113 let bytes_read = reader
114 .read_line(&mut line)
115 .map_err(|error| AppError::new(error.to_string()))?;
116 if bytes_read == 0 {
117 break;
118 }
119
120 line_number += 1;
121 diagnostics.read_lines += 1;
122 if line.contains("\"token_count\"") {
123 file_has_token_count = true;
124 }
125
126 if !line.contains("\"rate_limits\"")
127 && !line.contains("\"session_meta\"")
128 && !line.contains("\"turn_context\"")
129 {
130 continue;
131 }
132
133 let is_fork_replay_line = session_file.replay_prefix_lines > 0
134 && line_number > 1
135 && line_number <= session_file.replay_prefix_lines;
136 if is_fork_replay_line {
137 diagnostics.fork_replay_lines_skipped += 1;
138 continue;
139 }
140
141 if line.contains("\"session_meta\"") || line.contains("\"turn_context\"") {
142 match metadata_event(&line) {
143 Ok(Some(metadata)) => {
144 if metadata.event_type == "session_meta"
145 && session_file.current_session_id.is_none()
146 {
147 if let Some(next_session_id) = metadata.session_id {
148 session_id = next_session_id;
149 }
150 }
151 }
152 Ok(None) => {}
153 Err(_) => diagnostics.invalid_json_lines += 1,
154 }
155 }
156
157 if !line.contains("\"rate_limits\"") {
158 continue;
159 }
160 file_has_rate_limits = true;
161 let account_id = resolve_rate_limit_account_id(timestamp_from_line(&line), account_history);
162 let mut parse_diagnostics = RateLimitParseDiagnostics::default();
163 let mut parsed = parse_rate_limit_line(
164 &line,
165 RateLimitLineContext {
166 session_id: &session_id,
167 account_id: account_id.as_deref(),
168 source: Some(SourceSpan {
169 path: file_path.clone(),
170 line_number,
171 }),
172 },
173 &mut parse_diagnostics,
174 );
175 diagnostics.merge_parse(&parse_diagnostics);
176
177 for sample in parsed.drain(..) {
178 if !sample_in_range(&sample, range) {
179 diagnostics.out_of_range_samples += 1;
180 continue;
181 }
182 if let Some(filter) = options.account_id.as_deref() {
183 if sample.account_id.as_deref() != Some(filter) {
184 diagnostics.account_mismatches += 1;
185 continue;
186 }
187 }
188 if let Some(filter) = options.plan_type.as_deref() {
189 if sample.plan_type.as_deref() != Some(filter) {
190 diagnostics.plan_mismatches += 1;
191 continue;
192 }
193 }
194 if let Some(filter) = options.window_minutes {
195 if sample.window_minutes != filter {
196 diagnostics.window_mismatches += 1;
197 continue;
198 }
199 }
200 if let Some(source) = sample.source.clone() {
201 diagnostics.source_spans.push(source);
202 }
203 diagnostics.included_samples += 1;
204 samples.push(sample);
205 }
206 }
207
208 if file_has_rate_limits && !file_has_token_count {
209 diagnostics.rate_limit_only_files += 1;
210 }
211
212 Ok(samples)
213}
214
215#[derive(Debug)]
216struct MetadataEvent {
217 event_type: String,
218 session_id: Option<String>,
219}
220
221fn metadata_event(line: &str) -> Result<Option<MetadataEvent>, serde_json::Error> {
222 let value = serde_json::from_str::<Value>(line)?;
223 let Some(object) = value.as_object() else {
224 return Ok(None);
225 };
226 let Some(event_type) = object
227 .get("type")
228 .and_then(Value::as_str)
229 .map(str::to_string)
230 else {
231 return Ok(None);
232 };
233 if event_type != "session_meta" && event_type != "turn_context" {
234 return Ok(None);
235 }
236 let session_id = object
237 .get("payload")
238 .and_then(Value::as_object)
239 .and_then(|payload| payload.get("id"))
240 .and_then(Value::as_str)
241 .map(str::trim)
242 .filter(|value| !value.is_empty())
243 .map(str::to_string);
244
245 Ok(Some(MetadataEvent {
246 event_type,
247 session_id,
248 }))
249}
250
251fn resolve_rate_limit_account_id(
252 timestamp: Option<DateTime<Utc>>,
253 history: Option<&UsageAccountHistory>,
254) -> Option<String> {
255 history.and_then(|history| timestamp.and_then(|timestamp| history.account_id_at(timestamp)))
256}
257
258fn sample_in_range(sample: &RateLimitSample, range: DateRange) -> bool {
259 sample.timestamp >= range.start && sample.timestamp <= range.end
260}
261
262fn read_last_rate_limit_timestamp(path: &Path) -> Result<Option<DateTime<Utc>>, AppError> {
263 let mut file = File::open(path).map_err(|error| AppError::new(error.to_string()))?;
264 let mut position = file
265 .seek(SeekFrom::End(0))
266 .map_err(|error| AppError::new(error.to_string()))?;
267 let mut buffer = vec![0_u8; SESSION_READ_BUFFER_SIZE];
268 let mut carry = Vec::new();
269
270 while position > 0 {
271 let read_len = (position as usize).min(buffer.len());
272 position -= read_len as u64;
273 file.seek(SeekFrom::Start(position))
274 .map_err(|error| AppError::new(error.to_string()))?;
275 file.read_exact(&mut buffer[..read_len])
276 .map_err(|error| AppError::new(error.to_string()))?;
277
278 let mut combined = Vec::with_capacity(read_len + carry.len());
279 combined.extend_from_slice(&buffer[..read_len]);
280 combined.extend_from_slice(&carry);
281
282 if position > 0 {
283 let Some(newline_index) = combined.iter().position(|byte| *byte == b'\n') else {
284 carry = combined;
285 continue;
286 };
287
288 if let Some(timestamp) = last_rate_limit_timestamp_in_lines(
289 combined
290 .get(newline_index + 1..)
291 .expect("newline index is within combined bytes"),
292 ) {
293 return Ok(Some(timestamp));
294 }
295
296 carry.clear();
297 carry.extend_from_slice(
298 combined
299 .get(..newline_index)
300 .expect("newline index is within combined bytes"),
301 );
302 } else if let Some(timestamp) = last_rate_limit_timestamp_in_lines(&combined) {
303 return Ok(Some(timestamp));
304 }
305 }
306
307 Ok(None)
308}
309
310fn last_rate_limit_timestamp_in_lines(bytes: &[u8]) -> Option<DateTime<Utc>> {
311 for line in bytes.split(|byte| *byte == b'\n').rev() {
312 let line = trim_line_end_bytes(line);
313 if line.is_empty() || !line_contains_bytes(line, b"\"rate_limits\"") {
314 continue;
315 };
316
317 let Ok(line) = std::str::from_utf8(line) else {
318 continue;
319 };
320 if let Some(timestamp) = timestamp_from_line(line) {
321 return Some(timestamp);
322 }
323 }
324
325 None
326}
327
328fn timestamp_from_line(line: &str) -> Option<DateTime<Utc>> {
329 let value = serde_json::from_str::<Value>(line).ok()?;
330 let object = value.as_object()?;
331 object.get("timestamp").and_then(value_to_utc)
332}
333
334fn value_to_utc(value: &Value) -> Option<DateTime<Utc>> {
335 match value {
336 Value::String(value) => DateTime::parse_from_rfc3339(value.trim())
337 .ok()
338 .map(|timestamp| timestamp.with_timezone(&Utc)),
339 Value::Number(number) => {
340 let millis = number
341 .as_i64()
342 .or_else(|| number.as_u64().and_then(|value| i64::try_from(value).ok()))?;
343 Utc.timestamp_millis_opt(millis).single()
344 }
345 _ => None,
346 }
347}
348
349fn trim_line_end_bytes(line: &[u8]) -> &[u8] {
350 line.strip_suffix(b"\r").unwrap_or(line)
351}
352
353fn line_contains_bytes(line: &[u8], needle: &[u8]) -> bool {
354 line.windows(needle.len()).any(|window| window == needle)
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use crate::account_history::format_account_history_iso;
361 use crate::account_history::{
362 AccountHistoryAccount, AccountHistoryStore, AccountHistorySwitchEvent,
363 ACCOUNT_HISTORY_STORE_VERSION, AUTH_SELECT_SOURCE, DEFAULT_ACCOUNT_SOURCE,
364 };
365 use chrono::TimeZone;
366 use serde_json::json;
367 use std::io::Write;
368 use std::path::PathBuf;
369 use tempfile::TempDir;
370
371 #[test]
372 fn fixture_scan_returns_sorted_5h_and_7d_samples_with_diagnostics() {
373 let fixture = fixture_codex_home();
374
375 let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
376 start: utc_time(2026, 5, 10, 0, 0),
377 end: utc_time(2026, 5, 12, 14, 0),
378 sessions_dir: fixture.join("sessions"),
379 scan_all_files: true,
380 account_history_file: Some(fixture.join("codex-ops/auth-account-history.json")),
381 account_id: None,
382 plan_type: None,
383 window_minutes: None,
384 })
385 .expect("read limit samples");
386
387 assert_eq!(report.samples.len(), 11);
388 assert!(report
389 .samples
390 .windows(2)
391 .all(|pair| pair[0].timestamp <= pair[1].timestamp));
392 assert_eq!(
393 report
394 .samples
395 .iter()
396 .filter(|sample| sample.window_minutes == 300)
397 .count(),
398 6
399 );
400 assert_eq!(
401 report
402 .samples
403 .iter()
404 .filter(|sample| sample.window_minutes == 10080)
405 .count(),
406 5
407 );
408 assert_eq!(report.diagnostics.null_rate_limits, 1);
409 assert_eq!(report.diagnostics.rate_limit_only_files, 2);
410 assert_eq!(report.diagnostics.rate_limit_events, 7);
411 assert_eq!(report.diagnostics.included_samples, 11);
412 assert_eq!(report.diagnostics.file_read_concurrency, 1);
413 assert_eq!(report.diagnostics.source_spans.len(), 11);
414 assert!(report
415 .samples
416 .iter()
417 .any(|sample| sample.account_id.as_deref() == Some("account-other")));
418 }
419
420 #[test]
421 fn account_plan_and_window_filters_are_applied_after_attribution() {
422 let fixture = fixture_codex_home();
423 let base = RateLimitSamplesReadOptions {
424 start: utc_time(2026, 5, 10, 0, 0),
425 end: utc_time(2026, 5, 12, 14, 0),
426 sessions_dir: fixture.join("sessions"),
427 scan_all_files: true,
428 account_history_file: Some(fixture.join("codex-ops/auth-account-history.json")),
429 account_id: None,
430 plan_type: None,
431 window_minutes: None,
432 };
433
434 let mut fixture_account = base.clone();
435 fixture_account.account_id = Some("account-fixture".to_string());
436 let report = read_rate_limit_samples_report(&fixture_account).expect("account filter");
437 assert_eq!(report.samples.len(), 10);
438 assert_eq!(report.diagnostics.account_mismatches, 1);
439
440 let mut other_account = base.clone();
441 other_account.account_id = Some("account-other".to_string());
442 let report = read_rate_limit_samples_report(&other_account).expect("other account filter");
443 assert_eq!(report.samples.len(), 1);
444 assert_eq!(report.samples[0].plan_type.as_deref(), Some("plus"));
445 assert_eq!(report.diagnostics.account_mismatches, 10);
446
447 let mut plan = base.clone();
448 plan.plan_type = Some("plus".to_string());
449 let report = read_rate_limit_samples_report(&plan).expect("plan filter");
450 assert_eq!(report.samples.len(), 1);
451 assert_eq!(report.diagnostics.plan_mismatches, 10);
452
453 let mut window = base;
454 window.window_minutes = Some(10080);
455 let report = read_rate_limit_samples_report(&window).expect("window filter");
456 assert_eq!(report.samples.len(), 5);
457 assert_eq!(report.diagnostics.window_mismatches, 6);
458 }
459
460 #[test]
461 fn account_filter_without_history_treats_samples_as_unknown_mismatches() {
462 let fixture = fixture_codex_home();
463
464 let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
465 start: utc_time(2026, 5, 10, 0, 0),
466 end: utc_time(2026, 5, 12, 14, 0),
467 sessions_dir: fixture.join("sessions"),
468 scan_all_files: true,
469 account_history_file: None,
470 account_id: Some("account-fixture".to_string()),
471 plan_type: None,
472 window_minutes: None,
473 })
474 .expect("read limit samples");
475
476 assert!(report.samples.is_empty());
477 assert_eq!(report.diagnostics.account_mismatches, 11);
478 }
479
480 #[test]
481 fn tail_prefilter_reads_rate_limit_only_files_without_token_count() {
482 let temp = TempDir::new().expect("tempdir");
483 let sessions_dir = temp.path().join("sessions");
484 write_session_file(
485 &sessions_dir,
486 2026,
487 5,
488 10,
489 "rollout-2026-05-10T00-00-00-rate-limit-only.jsonl",
490 &[rate_limit_line(
491 "2026-05-12T00:00:01.000Z",
492 300,
493 12.0,
494 1778605200,
495 )],
496 );
497 write_session_file(
498 &sessions_dir,
499 2026,
500 5,
501 9,
502 "rollout-2026-05-09T00-00-00-stale-rate-limit.jsonl",
503 &[rate_limit_line(
504 "2026-05-11T23:59:59.000Z",
505 300,
506 92.0,
507 1778605200,
508 )],
509 );
510
511 let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
512 start: utc_time(2026, 5, 12, 0, 0),
513 end: utc_time(2026, 5, 12, 1, 0),
514 sessions_dir,
515 scan_all_files: false,
516 account_history_file: None,
517 account_id: None,
518 plan_type: None,
519 window_minutes: None,
520 })
521 .expect("read limit samples");
522
523 assert_eq!(report.samples.len(), 1);
524 assert_eq!(report.samples[0].used_percent, 12.0);
525 assert_eq!(report.diagnostics.tail_read_files, 2);
526 assert_eq!(report.diagnostics.tail_read_hits, 1);
527 assert_eq!(report.diagnostics.prefiltered_files, 1);
528 assert_eq!(report.diagnostics.rate_limit_only_files, 1);
529 }
530
531 #[test]
532 fn fork_replayed_rate_limits_are_not_counted_for_child_sessions() {
533 let temp = TempDir::new().expect("tempdir");
534 let sessions_dir = temp.path().join("sessions");
535 let parent = "019e5110-1f92-7280-b02a-0876af32b81f";
536 let child = "019e5111-2720-73d0-8519-4c80dffbe80e";
537 let parent_lines = vec![
538 session_meta_line("2026-05-21T00:00:00.000Z", parent),
539 rate_limit_line("2026-05-21T00:00:01.000Z", 300, 10.0, 1779469200),
540 ];
541 let mut child_lines = vec![session_meta_line("2026-05-21T00:10:00.000Z", child)];
542 child_lines.extend(retimestamp_lines(&parent_lines, "2026-05-21T00:10:00.001Z"));
543 child_lines.push(rate_limit_line(
544 "2026-05-21T00:10:01.000Z",
545 300,
546 11.0,
547 1779469200,
548 ));
549 write_session_file(
550 &sessions_dir,
551 2026,
552 5,
553 21,
554 &format!("rollout-2026-05-21T00-00-00-{parent}.jsonl"),
555 &parent_lines,
556 );
557 write_session_file(
558 &sessions_dir,
559 2026,
560 5,
561 21,
562 &format!("rollout-2026-05-21T00-10-00-{child}.jsonl"),
563 &child_lines,
564 );
565
566 let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
567 start: utc_time(2026, 5, 21, 0, 0),
568 end: utc_time(2026, 5, 21, 1, 0),
569 sessions_dir,
570 scan_all_files: false,
571 account_history_file: None,
572 account_id: None,
573 plan_type: None,
574 window_minutes: None,
575 })
576 .expect("read limit samples");
577
578 assert_eq!(report.samples.len(), 2);
579 assert_eq!(
580 report
581 .samples
582 .iter()
583 .filter(|sample| sample.session_id == child)
584 .count(),
585 1
586 );
587 assert_eq!(report.diagnostics.rate_limit_events, 2);
588 assert_eq!(report.diagnostics.fork_replay_lines_skipped, 2);
589 }
590
591 #[test]
592 fn serialized_samples_report_hides_source_evidence_by_default() {
593 let fixture = fixture_codex_home();
594 let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
595 start: utc_time(2026, 5, 10, 0, 0),
596 end: utc_time(2026, 5, 12, 14, 0),
597 sessions_dir: fixture.join("sessions"),
598 scan_all_files: true,
599 account_history_file: Some(fixture.join("codex-ops/auth-account-history.json")),
600 account_id: None,
601 plan_type: None,
602 window_minutes: None,
603 })
604 .expect("read limit samples");
605
606 let value = serde_json::to_value(&report.samples).expect("sample json");
607 assert_no_source_evidence(&value);
608 let diagnostics = serde_json::to_value(&report.diagnostics).expect("diagnostics json");
609 assert_no_source_evidence(&diagnostics);
610 }
611
612 #[test]
613 fn scanner_accepts_synthetic_account_history_store_for_attribution() {
614 let temp = TempDir::new().expect("tempdir");
615 let sessions_dir = temp.path().join("sessions");
616 let history_file = temp.path().join("account-history.json");
617 write_account_history(&history_file);
618 write_session_file(
619 &sessions_dir,
620 2026,
621 5,
622 12,
623 "rollout-2026-05-12T10-00-00-before-switch.jsonl",
624 &[rate_limit_line(
625 "2026-05-12T10:00:01.000Z",
626 300,
627 20.0,
628 1778605200,
629 )],
630 );
631 write_session_file(
632 &sessions_dir,
633 2026,
634 5,
635 12,
636 "rollout-2026-05-12T13-00-00-after-switch.jsonl",
637 &[rate_limit_line(
638 "2026-05-12T13:00:01.000Z",
639 300,
640 22.0,
641 1778605200,
642 )],
643 );
644
645 let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
646 start: utc_time(2026, 5, 12, 0, 0),
647 end: utc_time(2026, 5, 12, 14, 0),
648 sessions_dir,
649 scan_all_files: false,
650 account_history_file: Some(history_file),
651 account_id: None,
652 plan_type: None,
653 window_minutes: None,
654 })
655 .expect("read limit samples");
656
657 let accounts = report
658 .samples
659 .iter()
660 .map(|sample| sample.account_id.as_deref())
661 .collect::<Vec<_>>();
662 assert_eq!(
663 accounts,
664 vec![Some("account-fixture"), Some("account-other")]
665 );
666 }
667
668 fn fixture_codex_home() -> PathBuf {
669 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/fixtures/rust-run/codex-home")
670 }
671
672 fn write_session_file(
673 root: &Path,
674 year: i32,
675 month: u32,
676 day: u32,
677 file_name: &str,
678 lines: &[String],
679 ) -> PathBuf {
680 let dir = root
681 .join(format!("{year:04}"))
682 .join(format!("{month:02}"))
683 .join(format!("{day:02}"));
684 std::fs::create_dir_all(&dir).expect("create session dir");
685 let path = dir.join(file_name);
686 let mut file = std::fs::File::create(&path).expect("create session file");
687 for line in lines {
688 writeln!(file, "{line}").expect("write session line");
689 }
690 path
691 }
692
693 fn session_meta_line(timestamp: &str, session_id: &str) -> String {
694 json!({
695 "timestamp": timestamp,
696 "type": "session_meta",
697 "payload": {
698 "id": session_id,
699 "model": "gpt-5.5",
700 "cwd": "/workspace/limit-scan-test"
701 }
702 })
703 .to_string()
704 }
705
706 fn rate_limit_line(
707 timestamp: &str,
708 window_minutes: i64,
709 used_percent: f64,
710 resets_at: i64,
711 ) -> String {
712 json!({
713 "timestamp": timestamp,
714 "type": "event_msg",
715 "payload": {
716 "rate_limits": {
717 "primary": {
718 "window_minutes": window_minutes,
719 "used_percent": used_percent,
720 "resets_at": resets_at
721 },
722 "plan_type": "pro",
723 "limit_id": "fixture-limit"
724 }
725 }
726 })
727 .to_string()
728 }
729
730 fn retimestamp_lines(lines: &[String], timestamp: &str) -> Vec<String> {
731 lines
732 .iter()
733 .map(|line| {
734 let mut value = serde_json::from_str::<Value>(line).expect("json line");
735 if let Value::Object(fields) = &mut value {
736 fields.insert(
737 "timestamp".to_string(),
738 Value::String(timestamp.to_string()),
739 );
740 }
741 serde_json::to_string(&value).expect("json string")
742 })
743 .collect()
744 }
745
746 fn write_account_history(path: &Path) {
747 let store = AccountHistoryStore {
748 version: ACCOUNT_HISTORY_STORE_VERSION,
749 default_account: Some(AccountHistoryAccount {
750 account_id: "account-fixture".to_string(),
751 observed_at: format_account_history_iso(utc_time(2026, 5, 12, 0, 0)),
752 source: DEFAULT_ACCOUNT_SOURCE.to_string(),
753 name: None,
754 email: None,
755 plan_type: Some("pro".to_string()),
756 }),
757 switches: vec![AccountHistorySwitchEvent {
758 timestamp: format_account_history_iso(utc_time(2026, 5, 12, 12, 30)),
759 from_account_id: "account-fixture".to_string(),
760 to_account_id: "account-other".to_string(),
761 source: AUTH_SELECT_SOURCE.to_string(),
762 }],
763 };
764 let content = serde_json::to_string_pretty(&store).expect("history json");
765 std::fs::write(path, format!("{content}\n")).expect("write history");
766 }
767
768 fn assert_no_source_evidence(value: &Value) {
769 match value {
770 Value::Object(object) => {
771 for key in ["source", "sourcePath", "sourceLine", "line", "lineNumber"] {
772 assert!(object.get(key).is_none(), "unexpected source key {key}");
773 }
774 for value in object.values() {
775 assert_no_source_evidence(value);
776 }
777 }
778 Value::Array(values) => {
779 for value in values {
780 assert_no_source_evidence(value);
781 }
782 }
783 _ => {}
784 }
785 }
786
787 fn utc_time(year: i32, month: u32, day: u32, hour: u32, minute: u32) -> DateTime<Utc> {
788 Utc.with_ymd_and_hms(year, month, day, hour, minute, 0)
789 .single()
790 .expect("valid test time")
791 }
792}