Skip to main content

codex_ops/limits/
scan.rs

1use super::events::{parse_rate_limit_line, RateLimitLineContext};
2use super::reports::{
3    RateLimitDiagnostics, RateLimitParseDiagnostics, RateLimitSample, RateLimitSamplesReadOptions,
4    RateLimitSamplesReport, SourceSpan,
5};
6use crate::account_history::{self, UsageAccountHistory};
7use crate::error::AppError;
8use crate::session_scan::{
9    prepare_session_scan, session_id_from_path, PreparedSessionFile, SessionScanDiagnostics,
10    SessionScanOptions, SESSION_READ_BUFFER_SIZE,
11};
12use crate::storage::path_to_string;
13use crate::time::DateRange;
14use chrono::{DateTime, TimeZone, Utc};
15use serde_json::Value;
16use std::fs::File;
17use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
18use std::path::Path;
19
20const RATE_LIMIT_FILE_READ_CONCURRENCY: i64 = 1;
21
22pub fn read_rate_limit_samples_report(
23    options: &RateLimitSamplesReadOptions,
24) -> Result<RateLimitSamplesReport, AppError> {
25    let account_history = match &options.account_history_file {
26        Some(path) => account_history::read_optional_usage_account_history(path)?,
27        None => None,
28    };
29    let range = DateRange {
30        start: options.start,
31        end: options.end,
32    };
33    let mut diagnostics =
34        RateLimitDiagnostics::new(RATE_LIMIT_FILE_READ_CONCURRENCY, options.scan_all_files);
35    let prepared = prepare_session_scan(
36        SessionScanOptions {
37            sessions_dir: &options.sessions_dir,
38            range,
39            scan_all_files: options.scan_all_files,
40        },
41        read_last_rate_limit_timestamp,
42    )?;
43    merge_session_scan(&mut diagnostics, &prepared.diagnostics);
44
45    let mut samples = Vec::new();
46    for file in &prepared.files {
47        let mut file_samples = read_rate_limit_samples_from_file(
48            file,
49            range,
50            account_history.as_ref(),
51            options,
52            &mut diagnostics,
53        )?;
54        samples.append(&mut file_samples);
55    }
56
57    samples.sort_by(|left, right| {
58        left.timestamp
59            .cmp(&right.timestamp)
60            .then_with(|| left.session_id.cmp(&right.session_id))
61            .then_with(|| left.window_minutes.cmp(&right.window_minutes))
62            .then_with(|| left.limit_id.cmp(&right.limit_id))
63    });
64
65    Ok(RateLimitSamplesReport {
66        start: options.start,
67        end: options.end,
68        sessions_dir: path_to_string(&options.sessions_dir),
69        samples,
70        diagnostics,
71    })
72}
73
74fn merge_session_scan(diagnostics: &mut RateLimitDiagnostics, session: &SessionScanDiagnostics) {
75    diagnostics.scanned_directories += session.scanned_directories;
76    diagnostics.skipped_directories += session.skipped_directories;
77    diagnostics.read_files += session.read_files;
78    diagnostics.skipped_files += session.skipped_files;
79    diagnostics.prefiltered_files += session.prefiltered_files;
80    diagnostics.tail_read_files += session.tail_read_files;
81    diagnostics.tail_read_hits += session.tail_read_hits;
82    diagnostics.mtime_read_files += session.mtime_read_files;
83    diagnostics.mtime_tail_hits += session.mtime_tail_hits;
84    diagnostics.mtime_read_hits += session.mtime_read_hits;
85    diagnostics.fork_files += session.fork_files;
86    diagnostics.fork_parent_missing += session.fork_parent_missing;
87    diagnostics.fork_replay_lines += session.fork_replay_lines;
88}
89
90fn read_rate_limit_samples_from_file(
91    session_file: &PreparedSessionFile,
92    range: DateRange,
93    account_history: Option<&UsageAccountHistory>,
94    options: &RateLimitSamplesReadOptions,
95    diagnostics: &mut RateLimitDiagnostics,
96) -> Result<Vec<RateLimitSample>, AppError> {
97    let path = &session_file.path;
98    let file_handle = File::open(path).map_err(|error| AppError::new(error.to_string()))?;
99    let mut reader = BufReader::with_capacity(SESSION_READ_BUFFER_SIZE, file_handle);
100    let mut line = String::new();
101    let mut line_number = 0_usize;
102    let mut session_id = session_file
103        .current_session_id
104        .clone()
105        .unwrap_or_else(|| session_id_from_path(path));
106    let file_path = path_to_string(path);
107    let mut file_has_rate_limits = false;
108    let mut file_has_token_count = false;
109    let mut samples = Vec::new();
110
111    loop {
112        line.clear();
113        let bytes_read = reader
114            .read_line(&mut line)
115            .map_err(|error| AppError::new(error.to_string()))?;
116        if bytes_read == 0 {
117            break;
118        }
119
120        line_number += 1;
121        diagnostics.read_lines += 1;
122        if line.contains("\"token_count\"") {
123            file_has_token_count = true;
124        }
125
126        if !line.contains("\"rate_limits\"")
127            && !line.contains("\"session_meta\"")
128            && !line.contains("\"turn_context\"")
129        {
130            continue;
131        }
132
133        let is_fork_replay_line = session_file.replay_prefix_lines > 0
134            && line_number > 1
135            && line_number <= session_file.replay_prefix_lines;
136        if is_fork_replay_line {
137            diagnostics.fork_replay_lines_skipped += 1;
138            continue;
139        }
140
141        if line.contains("\"session_meta\"") || line.contains("\"turn_context\"") {
142            match metadata_event(&line) {
143                Ok(Some(metadata)) => {
144                    if metadata.event_type == "session_meta"
145                        && session_file.current_session_id.is_none()
146                    {
147                        if let Some(next_session_id) = metadata.session_id {
148                            session_id = next_session_id;
149                        }
150                    }
151                }
152                Ok(None) => {}
153                Err(_) => diagnostics.invalid_json_lines += 1,
154            }
155        }
156
157        if !line.contains("\"rate_limits\"") {
158            continue;
159        }
160        file_has_rate_limits = true;
161        let account_id = resolve_rate_limit_account_id(timestamp_from_line(&line), account_history);
162        let mut parse_diagnostics = RateLimitParseDiagnostics::default();
163        let mut parsed = parse_rate_limit_line(
164            &line,
165            RateLimitLineContext {
166                session_id: &session_id,
167                account_id: account_id.as_deref(),
168                source: Some(SourceSpan {
169                    path: file_path.clone(),
170                    line_number,
171                }),
172            },
173            &mut parse_diagnostics,
174        );
175        diagnostics.merge_parse(&parse_diagnostics);
176
177        for sample in parsed.drain(..) {
178            if !sample_in_range(&sample, range) {
179                diagnostics.out_of_range_samples += 1;
180                continue;
181            }
182            if let Some(filter) = options.account_id.as_deref() {
183                if sample.account_id.as_deref() != Some(filter) {
184                    diagnostics.account_mismatches += 1;
185                    continue;
186                }
187            }
188            if let Some(filter) = options.plan_type.as_deref() {
189                if sample.plan_type.as_deref() != Some(filter) {
190                    diagnostics.plan_mismatches += 1;
191                    continue;
192                }
193            }
194            if let Some(filter) = options.window_minutes {
195                if sample.window_minutes != filter {
196                    diagnostics.window_mismatches += 1;
197                    continue;
198                }
199            }
200            if let Some(source) = sample.source.clone() {
201                diagnostics.source_spans.push(source);
202            }
203            diagnostics.included_samples += 1;
204            samples.push(sample);
205        }
206    }
207
208    if file_has_rate_limits && !file_has_token_count {
209        diagnostics.rate_limit_only_files += 1;
210    }
211
212    Ok(samples)
213}
214
215#[derive(Debug)]
216struct MetadataEvent {
217    event_type: String,
218    session_id: Option<String>,
219}
220
221fn metadata_event(line: &str) -> Result<Option<MetadataEvent>, serde_json::Error> {
222    let value = serde_json::from_str::<Value>(line)?;
223    let Some(object) = value.as_object() else {
224        return Ok(None);
225    };
226    let Some(event_type) = object
227        .get("type")
228        .and_then(Value::as_str)
229        .map(str::to_string)
230    else {
231        return Ok(None);
232    };
233    if event_type != "session_meta" && event_type != "turn_context" {
234        return Ok(None);
235    }
236    let session_id = object
237        .get("payload")
238        .and_then(Value::as_object)
239        .and_then(|payload| payload.get("id"))
240        .and_then(Value::as_str)
241        .map(str::trim)
242        .filter(|value| !value.is_empty())
243        .map(str::to_string);
244
245    Ok(Some(MetadataEvent {
246        event_type,
247        session_id,
248    }))
249}
250
251fn resolve_rate_limit_account_id(
252    timestamp: Option<DateTime<Utc>>,
253    history: Option<&UsageAccountHistory>,
254) -> Option<String> {
255    history.and_then(|history| timestamp.and_then(|timestamp| history.account_id_at(timestamp)))
256}
257
258fn sample_in_range(sample: &RateLimitSample, range: DateRange) -> bool {
259    sample.timestamp >= range.start && sample.timestamp <= range.end
260}
261
262fn read_last_rate_limit_timestamp(path: &Path) -> Result<Option<DateTime<Utc>>, AppError> {
263    let mut file = File::open(path).map_err(|error| AppError::new(error.to_string()))?;
264    let mut position = file
265        .seek(SeekFrom::End(0))
266        .map_err(|error| AppError::new(error.to_string()))?;
267    let mut buffer = vec![0_u8; SESSION_READ_BUFFER_SIZE];
268    let mut carry = Vec::new();
269
270    while position > 0 {
271        let read_len = (position as usize).min(buffer.len());
272        position -= read_len as u64;
273        file.seek(SeekFrom::Start(position))
274            .map_err(|error| AppError::new(error.to_string()))?;
275        file.read_exact(&mut buffer[..read_len])
276            .map_err(|error| AppError::new(error.to_string()))?;
277
278        let mut combined = Vec::with_capacity(read_len + carry.len());
279        combined.extend_from_slice(&buffer[..read_len]);
280        combined.extend_from_slice(&carry);
281
282        if position > 0 {
283            let Some(newline_index) = combined.iter().position(|byte| *byte == b'\n') else {
284                carry = combined;
285                continue;
286            };
287
288            if let Some(timestamp) = last_rate_limit_timestamp_in_lines(
289                combined
290                    .get(newline_index + 1..)
291                    .expect("newline index is within combined bytes"),
292            ) {
293                return Ok(Some(timestamp));
294            }
295
296            carry.clear();
297            carry.extend_from_slice(
298                combined
299                    .get(..newline_index)
300                    .expect("newline index is within combined bytes"),
301            );
302        } else if let Some(timestamp) = last_rate_limit_timestamp_in_lines(&combined) {
303            return Ok(Some(timestamp));
304        }
305    }
306
307    Ok(None)
308}
309
310fn last_rate_limit_timestamp_in_lines(bytes: &[u8]) -> Option<DateTime<Utc>> {
311    for line in bytes.split(|byte| *byte == b'\n').rev() {
312        let line = trim_line_end_bytes(line);
313        if line.is_empty() || !line_contains_bytes(line, b"\"rate_limits\"") {
314            continue;
315        };
316
317        let Ok(line) = std::str::from_utf8(line) else {
318            continue;
319        };
320        if let Some(timestamp) = timestamp_from_line(line) {
321            return Some(timestamp);
322        }
323    }
324
325    None
326}
327
328fn timestamp_from_line(line: &str) -> Option<DateTime<Utc>> {
329    let value = serde_json::from_str::<Value>(line).ok()?;
330    let object = value.as_object()?;
331    object.get("timestamp").and_then(value_to_utc)
332}
333
334fn value_to_utc(value: &Value) -> Option<DateTime<Utc>> {
335    match value {
336        Value::String(value) => DateTime::parse_from_rfc3339(value.trim())
337            .ok()
338            .map(|timestamp| timestamp.with_timezone(&Utc)),
339        Value::Number(number) => {
340            let millis = number
341                .as_i64()
342                .or_else(|| number.as_u64().and_then(|value| i64::try_from(value).ok()))?;
343            Utc.timestamp_millis_opt(millis).single()
344        }
345        _ => None,
346    }
347}
348
349fn trim_line_end_bytes(line: &[u8]) -> &[u8] {
350    line.strip_suffix(b"\r").unwrap_or(line)
351}
352
353fn line_contains_bytes(line: &[u8], needle: &[u8]) -> bool {
354    line.windows(needle.len()).any(|window| window == needle)
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use crate::account_history::format_account_history_iso;
361    use crate::account_history::{
362        AccountHistoryAccount, AccountHistoryStore, AccountHistorySwitchEvent,
363        ACCOUNT_HISTORY_STORE_VERSION, AUTH_SELECT_SOURCE, DEFAULT_ACCOUNT_SOURCE,
364    };
365    use chrono::TimeZone;
366    use serde_json::json;
367    use std::io::Write;
368    use std::path::PathBuf;
369    use tempfile::TempDir;
370
371    #[test]
372    fn fixture_scan_returns_sorted_5h_and_7d_samples_with_diagnostics() {
373        let fixture = fixture_codex_home();
374
375        let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
376            start: utc_time(2026, 5, 10, 0, 0),
377            end: utc_time(2026, 5, 12, 14, 0),
378            sessions_dir: fixture.join("sessions"),
379            scan_all_files: true,
380            account_history_file: Some(fixture.join("codex-ops/auth-account-history.json")),
381            account_id: None,
382            plan_type: None,
383            window_minutes: None,
384        })
385        .expect("read limit samples");
386
387        assert_eq!(report.samples.len(), 11);
388        assert!(report
389            .samples
390            .windows(2)
391            .all(|pair| pair[0].timestamp <= pair[1].timestamp));
392        assert_eq!(
393            report
394                .samples
395                .iter()
396                .filter(|sample| sample.window_minutes == 300)
397                .count(),
398            6
399        );
400        assert_eq!(
401            report
402                .samples
403                .iter()
404                .filter(|sample| sample.window_minutes == 10080)
405                .count(),
406            5
407        );
408        assert_eq!(report.diagnostics.null_rate_limits, 1);
409        assert_eq!(report.diagnostics.rate_limit_only_files, 2);
410        assert_eq!(report.diagnostics.rate_limit_events, 7);
411        assert_eq!(report.diagnostics.included_samples, 11);
412        assert_eq!(report.diagnostics.file_read_concurrency, 1);
413        assert_eq!(report.diagnostics.source_spans.len(), 11);
414        assert!(report
415            .samples
416            .iter()
417            .any(|sample| sample.account_id.as_deref() == Some("account-other")));
418    }
419
420    #[test]
421    fn account_plan_and_window_filters_are_applied_after_attribution() {
422        let fixture = fixture_codex_home();
423        let base = RateLimitSamplesReadOptions {
424            start: utc_time(2026, 5, 10, 0, 0),
425            end: utc_time(2026, 5, 12, 14, 0),
426            sessions_dir: fixture.join("sessions"),
427            scan_all_files: true,
428            account_history_file: Some(fixture.join("codex-ops/auth-account-history.json")),
429            account_id: None,
430            plan_type: None,
431            window_minutes: None,
432        };
433
434        let mut fixture_account = base.clone();
435        fixture_account.account_id = Some("account-fixture".to_string());
436        let report = read_rate_limit_samples_report(&fixture_account).expect("account filter");
437        assert_eq!(report.samples.len(), 10);
438        assert_eq!(report.diagnostics.account_mismatches, 1);
439
440        let mut other_account = base.clone();
441        other_account.account_id = Some("account-other".to_string());
442        let report = read_rate_limit_samples_report(&other_account).expect("other account filter");
443        assert_eq!(report.samples.len(), 1);
444        assert_eq!(report.samples[0].plan_type.as_deref(), Some("plus"));
445        assert_eq!(report.diagnostics.account_mismatches, 10);
446
447        let mut plan = base.clone();
448        plan.plan_type = Some("plus".to_string());
449        let report = read_rate_limit_samples_report(&plan).expect("plan filter");
450        assert_eq!(report.samples.len(), 1);
451        assert_eq!(report.diagnostics.plan_mismatches, 10);
452
453        let mut window = base;
454        window.window_minutes = Some(10080);
455        let report = read_rate_limit_samples_report(&window).expect("window filter");
456        assert_eq!(report.samples.len(), 5);
457        assert_eq!(report.diagnostics.window_mismatches, 6);
458    }
459
460    #[test]
461    fn account_filter_without_history_treats_samples_as_unknown_mismatches() {
462        let fixture = fixture_codex_home();
463
464        let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
465            start: utc_time(2026, 5, 10, 0, 0),
466            end: utc_time(2026, 5, 12, 14, 0),
467            sessions_dir: fixture.join("sessions"),
468            scan_all_files: true,
469            account_history_file: None,
470            account_id: Some("account-fixture".to_string()),
471            plan_type: None,
472            window_minutes: None,
473        })
474        .expect("read limit samples");
475
476        assert!(report.samples.is_empty());
477        assert_eq!(report.diagnostics.account_mismatches, 11);
478    }
479
480    #[test]
481    fn tail_prefilter_reads_rate_limit_only_files_without_token_count() {
482        let temp = TempDir::new().expect("tempdir");
483        let sessions_dir = temp.path().join("sessions");
484        write_session_file(
485            &sessions_dir,
486            2026,
487            5,
488            10,
489            "rollout-2026-05-10T00-00-00-rate-limit-only.jsonl",
490            &[rate_limit_line(
491                "2026-05-12T00:00:01.000Z",
492                300,
493                12.0,
494                1778605200,
495            )],
496        );
497        write_session_file(
498            &sessions_dir,
499            2026,
500            5,
501            9,
502            "rollout-2026-05-09T00-00-00-stale-rate-limit.jsonl",
503            &[rate_limit_line(
504                "2026-05-11T23:59:59.000Z",
505                300,
506                92.0,
507                1778605200,
508            )],
509        );
510
511        let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
512            start: utc_time(2026, 5, 12, 0, 0),
513            end: utc_time(2026, 5, 12, 1, 0),
514            sessions_dir,
515            scan_all_files: false,
516            account_history_file: None,
517            account_id: None,
518            plan_type: None,
519            window_minutes: None,
520        })
521        .expect("read limit samples");
522
523        assert_eq!(report.samples.len(), 1);
524        assert_eq!(report.samples[0].used_percent, 12.0);
525        assert_eq!(report.diagnostics.tail_read_files, 2);
526        assert_eq!(report.diagnostics.tail_read_hits, 1);
527        assert_eq!(report.diagnostics.prefiltered_files, 1);
528        assert_eq!(report.diagnostics.rate_limit_only_files, 1);
529    }
530
531    #[test]
532    fn fork_replayed_rate_limits_are_not_counted_for_child_sessions() {
533        let temp = TempDir::new().expect("tempdir");
534        let sessions_dir = temp.path().join("sessions");
535        let parent = "019e5110-1f92-7280-b02a-0876af32b81f";
536        let child = "019e5111-2720-73d0-8519-4c80dffbe80e";
537        let parent_lines = vec![
538            session_meta_line("2026-05-21T00:00:00.000Z", parent),
539            rate_limit_line("2026-05-21T00:00:01.000Z", 300, 10.0, 1779469200),
540        ];
541        let mut child_lines = vec![session_meta_line("2026-05-21T00:10:00.000Z", child)];
542        child_lines.extend(retimestamp_lines(&parent_lines, "2026-05-21T00:10:00.001Z"));
543        child_lines.push(rate_limit_line(
544            "2026-05-21T00:10:01.000Z",
545            300,
546            11.0,
547            1779469200,
548        ));
549        write_session_file(
550            &sessions_dir,
551            2026,
552            5,
553            21,
554            &format!("rollout-2026-05-21T00-00-00-{parent}.jsonl"),
555            &parent_lines,
556        );
557        write_session_file(
558            &sessions_dir,
559            2026,
560            5,
561            21,
562            &format!("rollout-2026-05-21T00-10-00-{child}.jsonl"),
563            &child_lines,
564        );
565
566        let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
567            start: utc_time(2026, 5, 21, 0, 0),
568            end: utc_time(2026, 5, 21, 1, 0),
569            sessions_dir,
570            scan_all_files: false,
571            account_history_file: None,
572            account_id: None,
573            plan_type: None,
574            window_minutes: None,
575        })
576        .expect("read limit samples");
577
578        assert_eq!(report.samples.len(), 2);
579        assert_eq!(
580            report
581                .samples
582                .iter()
583                .filter(|sample| sample.session_id == child)
584                .count(),
585            1
586        );
587        assert_eq!(report.diagnostics.rate_limit_events, 2);
588        assert_eq!(report.diagnostics.fork_replay_lines_skipped, 2);
589    }
590
591    #[test]
592    fn serialized_samples_report_hides_source_evidence_by_default() {
593        let fixture = fixture_codex_home();
594        let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
595            start: utc_time(2026, 5, 10, 0, 0),
596            end: utc_time(2026, 5, 12, 14, 0),
597            sessions_dir: fixture.join("sessions"),
598            scan_all_files: true,
599            account_history_file: Some(fixture.join("codex-ops/auth-account-history.json")),
600            account_id: None,
601            plan_type: None,
602            window_minutes: None,
603        })
604        .expect("read limit samples");
605
606        let value = serde_json::to_value(&report.samples).expect("sample json");
607        assert_no_source_evidence(&value);
608        let diagnostics = serde_json::to_value(&report.diagnostics).expect("diagnostics json");
609        assert_no_source_evidence(&diagnostics);
610    }
611
612    #[test]
613    fn scanner_accepts_synthetic_account_history_store_for_attribution() {
614        let temp = TempDir::new().expect("tempdir");
615        let sessions_dir = temp.path().join("sessions");
616        let history_file = temp.path().join("account-history.json");
617        write_account_history(&history_file);
618        write_session_file(
619            &sessions_dir,
620            2026,
621            5,
622            12,
623            "rollout-2026-05-12T10-00-00-before-switch.jsonl",
624            &[rate_limit_line(
625                "2026-05-12T10:00:01.000Z",
626                300,
627                20.0,
628                1778605200,
629            )],
630        );
631        write_session_file(
632            &sessions_dir,
633            2026,
634            5,
635            12,
636            "rollout-2026-05-12T13-00-00-after-switch.jsonl",
637            &[rate_limit_line(
638                "2026-05-12T13:00:01.000Z",
639                300,
640                22.0,
641                1778605200,
642            )],
643        );
644
645        let report = read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
646            start: utc_time(2026, 5, 12, 0, 0),
647            end: utc_time(2026, 5, 12, 14, 0),
648            sessions_dir,
649            scan_all_files: false,
650            account_history_file: Some(history_file),
651            account_id: None,
652            plan_type: None,
653            window_minutes: None,
654        })
655        .expect("read limit samples");
656
657        let accounts = report
658            .samples
659            .iter()
660            .map(|sample| sample.account_id.as_deref())
661            .collect::<Vec<_>>();
662        assert_eq!(
663            accounts,
664            vec![Some("account-fixture"), Some("account-other")]
665        );
666    }
667
668    fn fixture_codex_home() -> PathBuf {
669        PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/fixtures/rust-run/codex-home")
670    }
671
672    fn write_session_file(
673        root: &Path,
674        year: i32,
675        month: u32,
676        day: u32,
677        file_name: &str,
678        lines: &[String],
679    ) -> PathBuf {
680        let dir = root
681            .join(format!("{year:04}"))
682            .join(format!("{month:02}"))
683            .join(format!("{day:02}"));
684        std::fs::create_dir_all(&dir).expect("create session dir");
685        let path = dir.join(file_name);
686        let mut file = std::fs::File::create(&path).expect("create session file");
687        for line in lines {
688            writeln!(file, "{line}").expect("write session line");
689        }
690        path
691    }
692
693    fn session_meta_line(timestamp: &str, session_id: &str) -> String {
694        json!({
695            "timestamp": timestamp,
696            "type": "session_meta",
697            "payload": {
698                "id": session_id,
699                "model": "gpt-5.5",
700                "cwd": "/workspace/limit-scan-test"
701            }
702        })
703        .to_string()
704    }
705
706    fn rate_limit_line(
707        timestamp: &str,
708        window_minutes: i64,
709        used_percent: f64,
710        resets_at: i64,
711    ) -> String {
712        json!({
713            "timestamp": timestamp,
714            "type": "event_msg",
715            "payload": {
716                "rate_limits": {
717                    "primary": {
718                        "window_minutes": window_minutes,
719                        "used_percent": used_percent,
720                        "resets_at": resets_at
721                    },
722                    "plan_type": "pro",
723                    "limit_id": "fixture-limit"
724                }
725            }
726        })
727        .to_string()
728    }
729
730    fn retimestamp_lines(lines: &[String], timestamp: &str) -> Vec<String> {
731        lines
732            .iter()
733            .map(|line| {
734                let mut value = serde_json::from_str::<Value>(line).expect("json line");
735                if let Value::Object(fields) = &mut value {
736                    fields.insert(
737                        "timestamp".to_string(),
738                        Value::String(timestamp.to_string()),
739                    );
740                }
741                serde_json::to_string(&value).expect("json string")
742            })
743            .collect()
744    }
745
746    fn write_account_history(path: &Path) {
747        let store = AccountHistoryStore {
748            version: ACCOUNT_HISTORY_STORE_VERSION,
749            default_account: Some(AccountHistoryAccount {
750                account_id: "account-fixture".to_string(),
751                observed_at: format_account_history_iso(utc_time(2026, 5, 12, 0, 0)),
752                source: DEFAULT_ACCOUNT_SOURCE.to_string(),
753                name: None,
754                email: None,
755                plan_type: Some("pro".to_string()),
756            }),
757            switches: vec![AccountHistorySwitchEvent {
758                timestamp: format_account_history_iso(utc_time(2026, 5, 12, 12, 30)),
759                from_account_id: "account-fixture".to_string(),
760                to_account_id: "account-other".to_string(),
761                source: AUTH_SELECT_SOURCE.to_string(),
762            }],
763        };
764        let content = serde_json::to_string_pretty(&store).expect("history json");
765        std::fs::write(path, format!("{content}\n")).expect("write history");
766    }
767
768    fn assert_no_source_evidence(value: &Value) {
769        match value {
770            Value::Object(object) => {
771                for key in ["source", "sourcePath", "sourceLine", "line", "lineNumber"] {
772                    assert!(object.get(key).is_none(), "unexpected source key {key}");
773                }
774                for value in object.values() {
775                    assert_no_source_evidence(value);
776                }
777            }
778            Value::Array(values) => {
779                for value in values {
780                    assert_no_source_evidence(value);
781                }
782            }
783            _ => {}
784        }
785    }
786
787    fn utc_time(year: i32, month: u32, day: u32, hour: u32, minute: u32) -> DateTime<Utc> {
788        Utc.with_ymd_and_hms(year, month, day, hour, minute, 0)
789            .single()
790            .expect("valid test time")
791    }
792}