Skip to main content

codex_ops/limits/
reports.rs

1use crate::error::AppError;
2use chrono::{DateTime, Duration, Utc};
3use serde::Serialize;
4use std::collections::{BTreeMap, BTreeSet};
5use std::path::PathBuf;
6
7#[derive(Clone, Debug, Default, Serialize, PartialEq)]
8#[serde(rename_all = "camelCase")]
9pub struct RateLimitParseDiagnostics {
10    pub invalid_json_lines: i64,
11    pub rate_limit_events: i64,
12    pub included_samples: i64,
13    pub null_rate_limits: i64,
14    pub missing_rate_limits: i64,
15    pub missing_timestamps: i64,
16    pub missing_windows: i64,
17    pub unknown_windows: i64,
18    pub invalid_window_minutes: i64,
19    pub invalid_used_percent: i64,
20    pub invalid_resets_at: i64,
21    pub out_of_range_percent: i64,
22}
23
24#[derive(Clone, Debug)]
25pub struct RateLimitSamplesReadOptions {
26    pub start: DateTime<Utc>,
27    pub end: DateTime<Utc>,
28    pub sessions_dir: PathBuf,
29    pub scan_all_files: bool,
30    pub account_history_file: Option<PathBuf>,
31    pub account_id: Option<String>,
32    pub plan_type: Option<String>,
33    pub window_minutes: Option<i64>,
34}
35
36#[derive(Clone, Debug)]
37pub struct RateLimitSamplesReport {
38    pub start: DateTime<Utc>,
39    pub end: DateTime<Utc>,
40    pub sessions_dir: String,
41    pub samples: Vec<RateLimitSample>,
42    pub diagnostics: RateLimitDiagnostics,
43}
44
45#[derive(Clone, Debug, Default, Serialize, PartialEq)]
46#[serde(rename_all = "camelCase")]
47pub struct RateLimitDiagnostics {
48    pub scan_all_files: bool,
49    pub scanned_directories: i64,
50    pub skipped_directories: i64,
51    pub read_files: i64,
52    pub skipped_files: i64,
53    pub prefiltered_files: i64,
54    pub tail_read_files: i64,
55    pub tail_read_hits: i64,
56    pub mtime_read_files: i64,
57    pub mtime_tail_hits: i64,
58    pub mtime_read_hits: i64,
59    pub fork_files: i64,
60    pub fork_parent_missing: i64,
61    pub fork_replay_lines: i64,
62    pub read_lines: i64,
63    pub invalid_json_lines: i64,
64    pub rate_limit_events: i64,
65    pub included_samples: i64,
66    pub null_rate_limits: i64,
67    pub missing_rate_limits: i64,
68    pub missing_timestamps: i64,
69    pub missing_windows: i64,
70    pub unknown_windows: i64,
71    pub invalid_window_minutes: i64,
72    pub invalid_used_percent: i64,
73    pub invalid_resets_at: i64,
74    pub out_of_range_percent: i64,
75    pub rate_limit_only_files: i64,
76    pub account_mismatches: i64,
77    pub plan_mismatches: i64,
78    pub window_mismatches: i64,
79    pub out_of_range_samples: i64,
80    pub fork_replay_lines_skipped: i64,
81    pub file_read_concurrency: i64,
82    #[serde(skip)]
83    pub(crate) source_spans: Vec<SourceSpan>,
84}
85
86impl RateLimitDiagnostics {
87    pub(crate) fn new(file_read_concurrency: i64, scan_all_files: bool) -> Self {
88        Self {
89            scan_all_files,
90            file_read_concurrency,
91            ..Self::default()
92        }
93    }
94
95    pub(crate) fn merge_parse(&mut self, other: &RateLimitParseDiagnostics) {
96        self.invalid_json_lines += other.invalid_json_lines;
97        self.rate_limit_events += other.rate_limit_events;
98        self.null_rate_limits += other.null_rate_limits;
99        self.missing_rate_limits += other.missing_rate_limits;
100        self.missing_timestamps += other.missing_timestamps;
101        self.missing_windows += other.missing_windows;
102        self.unknown_windows += other.unknown_windows;
103        self.invalid_window_minutes += other.invalid_window_minutes;
104        self.invalid_used_percent += other.invalid_used_percent;
105        self.invalid_resets_at += other.invalid_resets_at;
106        self.out_of_range_percent += other.out_of_range_percent;
107    }
108}
109
110#[derive(Clone, Debug, Eq, PartialEq)]
111pub struct SourceSpan {
112    pub(crate) path: String,
113    pub(crate) line_number: usize,
114}
115
116#[derive(Clone, Debug, Serialize, PartialEq)]
117#[serde(rename_all = "camelCase")]
118pub struct RateLimitSample {
119    pub timestamp: DateTime<Utc>,
120    pub session_id: String,
121    pub account_id: Option<String>,
122    pub plan_type: Option<String>,
123    pub limit_id: Option<String>,
124    pub window: String,
125    pub window_minutes: i64,
126    pub used_percent: f64,
127    pub remaining_percent: f64,
128    pub resets_at: DateTime<Utc>,
129    #[serde(skip)]
130    pub(crate) source: Option<SourceSpan>,
131}
132
133#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
134pub struct LimitReportOptions {
135    pub include_diagnostics: bool,
136    pub include_source_evidence: bool,
137}
138
139#[derive(Clone, Copy, Debug, Eq, PartialEq)]
140pub enum LimitWindowSelector {
141    FiveHours,
142    SevenDays,
143}
144
145impl LimitWindowSelector {
146    pub fn parse(value: &str) -> Result<Self, AppError> {
147        match value {
148            "5h" => Ok(Self::FiveHours),
149            "7d" => Ok(Self::SevenDays),
150            _ => Err(AppError::invalid_input(
151                "Invalid limit window. Expected one of: 5h, 7d.",
152            )),
153        }
154    }
155
156    pub fn as_str(self) -> &'static str {
157        match self {
158            Self::FiveHours => "5h",
159            Self::SevenDays => "7d",
160        }
161    }
162
163    pub fn window_minutes(self) -> i64 {
164        match self {
165            Self::FiveHours => 300,
166            Self::SevenDays => 10_080,
167        }
168    }
169}
170
171#[derive(Clone, Debug, Serialize, PartialEq)]
172#[serde(rename_all = "camelCase")]
173pub struct LimitSourceEvidence {
174    pub path: String,
175    pub line_number: usize,
176}
177
178#[derive(Clone, Debug, Serialize, PartialEq)]
179#[serde(rename_all = "camelCase")]
180pub struct LimitReportDiagnostics {
181    #[serde(flatten)]
182    pub scan: RateLimitDiagnostics,
183    pub duplicate_samples: i64,
184    pub unknown_limit_samples: i64,
185    pub unknown_limit_reset_events: i64,
186    pub ignored_inactive_stream_samples: i64,
187    #[serde(skip_serializing_if = "Vec::is_empty")]
188    pub source_evidence: Vec<LimitSourceEvidence>,
189}
190
191#[derive(Clone, Debug, Serialize, PartialEq)]
192#[serde(rename_all = "camelCase")]
193pub struct LimitWindow {
194    pub id: String,
195    pub account_id: Option<String>,
196    pub plan_type: Option<String>,
197    pub limit_id: Option<String>,
198    pub window: String,
199    pub window_minutes: i64,
200    pub estimated_start: DateTime<Utc>,
201    pub reset_at: DateTime<Utc>,
202    pub first_seen: DateTime<Utc>,
203    pub last_seen: DateTime<Utc>,
204    pub min_used_percent: f64,
205    pub max_used_percent: f64,
206    pub last_used_percent: f64,
207    pub sample_count: i64,
208    pub reset_kind: String,
209}
210
211#[derive(Clone, Debug, Serialize, PartialEq)]
212#[serde(rename_all = "camelCase")]
213pub struct LimitResetEvent {
214    pub at: DateTime<Utc>,
215    pub account_id: Option<String>,
216    pub plan_type: Option<String>,
217    pub limit_id: Option<String>,
218    pub window: String,
219    pub window_minutes: i64,
220    pub previous_used_percent: f64,
221    pub next_used_percent: f64,
222    pub previous_resets_at: DateTime<Utc>,
223    pub next_resets_at: DateTime<Utc>,
224    pub early_by_seconds: i64,
225    pub kind: String,
226}
227
228#[derive(Clone, Debug, Serialize, PartialEq)]
229#[serde(rename_all = "camelCase")]
230pub struct LimitTrendChange {
231    pub at: DateTime<Utc>,
232    pub account_id: Option<String>,
233    pub plan_type: Option<String>,
234    pub limit_id: Option<String>,
235    pub window: String,
236    pub window_minutes: i64,
237    pub used_percent: f64,
238    pub remaining_percent: f64,
239    pub delta_used_percent: Option<f64>,
240    pub resets_at: DateTime<Utc>,
241    pub kind: String,
242}
243
244#[derive(Clone, Debug, Serialize, PartialEq)]
245#[serde(rename_all = "camelCase")]
246pub struct LimitCurrentWindow {
247    pub id: String,
248    pub status: String,
249    pub account_id: Option<String>,
250    pub plan_type: Option<String>,
251    pub limit_id: Option<String>,
252    pub window: String,
253    pub window_minutes: i64,
254    pub last_seen: Option<DateTime<Utc>>,
255    pub used_percent: Option<f64>,
256    pub remaining_percent: Option<f64>,
257    pub resets_at: Option<DateTime<Utc>>,
258    pub reset_in_seconds: Option<i64>,
259}
260
261#[derive(Clone, Debug, Serialize, PartialEq)]
262#[serde(rename_all = "camelCase")]
263pub struct LimitSamplesReport {
264    pub status: String,
265    pub start: DateTime<Utc>,
266    pub end: DateTime<Utc>,
267    pub sessions_dir: String,
268    pub samples: Vec<RateLimitSample>,
269    #[serde(skip_serializing_if = "Option::is_none")]
270    pub diagnostics: Option<LimitReportDiagnostics>,
271}
272
273#[derive(Clone, Debug, Serialize, PartialEq)]
274#[serde(rename_all = "camelCase")]
275pub struct LimitWindowsReport {
276    pub status: String,
277    pub start: DateTime<Utc>,
278    pub end: DateTime<Utc>,
279    pub sessions_dir: String,
280    pub windows: Vec<LimitWindow>,
281    #[serde(skip_serializing_if = "Option::is_none")]
282    pub diagnostics: Option<LimitReportDiagnostics>,
283}
284
285#[derive(Clone, Debug, Serialize, PartialEq)]
286#[serde(rename_all = "camelCase")]
287pub struct LimitResetsReport {
288    pub status: String,
289    pub start: DateTime<Utc>,
290    pub end: DateTime<Utc>,
291    pub sessions_dir: String,
292    pub early_only: bool,
293    pub resets: Vec<LimitResetEvent>,
294    #[serde(skip_serializing_if = "Option::is_none")]
295    pub diagnostics: Option<LimitReportDiagnostics>,
296}
297
298#[derive(Clone, Debug, Serialize, PartialEq)]
299#[serde(rename_all = "camelCase")]
300pub struct LimitTrendReport {
301    pub status: String,
302    pub start: DateTime<Utc>,
303    pub end: DateTime<Utc>,
304    pub sessions_dir: String,
305    pub changes: Vec<LimitTrendChange>,
306    #[serde(skip_serializing_if = "Option::is_none")]
307    pub diagnostics: Option<LimitReportDiagnostics>,
308}
309
310#[derive(Clone, Debug, Serialize, PartialEq)]
311#[serde(rename_all = "camelCase")]
312pub struct LimitCurrentReport {
313    pub status: String,
314    pub now: DateTime<Utc>,
315    pub start: DateTime<Utc>,
316    pub end: DateTime<Utc>,
317    pub sessions_dir: String,
318    pub current: Vec<LimitCurrentWindow>,
319    #[serde(skip_serializing_if = "Option::is_none")]
320    pub diagnostics: Option<LimitReportDiagnostics>,
321}
322
323pub fn build_limit_samples_report(
324    input: &RateLimitSamplesReport,
325    options: LimitReportOptions,
326) -> LimitSamplesReport {
327    let (_, duplicate_samples) = normalized_samples(&input.samples);
328    LimitSamplesReport {
329        status: status_for_count(input.samples.len()),
330        start: input.start,
331        end: input.end,
332        sessions_dir: input.sessions_dir.clone(),
333        samples: input.samples.clone(),
334        diagnostics: diagnostics_for_options(input, duplicate_samples, 0, 0, options),
335    }
336}
337
338pub fn build_limit_windows_report(
339    input: &RateLimitSamplesReport,
340    options: LimitReportOptions,
341) -> LimitWindowsReport {
342    let (samples, duplicate_samples, ignored_inactive_stream_samples) =
343        normalized_derived_samples(&input.samples);
344    let windows = build_windows(&samples);
345
346    LimitWindowsReport {
347        status: status_for_count(windows.len()),
348        start: input.start,
349        end: input.end,
350        sessions_dir: input.sessions_dir.clone(),
351        windows,
352        diagnostics: diagnostics_for_options(
353            input,
354            duplicate_samples,
355            ignored_inactive_stream_samples,
356            0,
357            options,
358        ),
359    }
360}
361
362pub fn build_limit_resets_report(
363    input: &RateLimitSamplesReport,
364    early_only: bool,
365    options: LimitReportOptions,
366) -> LimitResetsReport {
367    let (samples, duplicate_samples, ignored_inactive_stream_samples) =
368        normalized_derived_samples(&input.samples);
369    let mut resets = build_resets(&samples);
370    if early_only {
371        resets.retain(|reset| reset.kind == RESET_KIND_EARLY);
372    }
373    let unknown_limit_reset_events = count_unknown_limit_reset_events(&resets);
374
375    LimitResetsReport {
376        status: status_for_count(samples.len()),
377        start: input.start,
378        end: input.end,
379        sessions_dir: input.sessions_dir.clone(),
380        early_only,
381        resets,
382        diagnostics: diagnostics_for_options(
383            input,
384            duplicate_samples,
385            ignored_inactive_stream_samples,
386            unknown_limit_reset_events,
387            options,
388        ),
389    }
390}
391
392pub fn build_limit_trend_report(
393    input: &RateLimitSamplesReport,
394    window_minutes: Option<i64>,
395    options: LimitReportOptions,
396) -> LimitTrendReport {
397    let (samples, duplicate_samples, ignored_inactive_stream_samples) =
398        normalized_derived_samples(&input.samples);
399    let changes = build_trend_changes(&samples, window_minutes);
400
401    LimitTrendReport {
402        status: status_for_count(changes.len()),
403        start: input.start,
404        end: input.end,
405        sessions_dir: input.sessions_dir.clone(),
406        changes,
407        diagnostics: diagnostics_for_options(
408            input,
409            duplicate_samples,
410            ignored_inactive_stream_samples,
411            0,
412            options,
413        ),
414    }
415}
416
417pub fn build_limit_current_report(
418    input: &RateLimitSamplesReport,
419    now: DateTime<Utc>,
420    options: LimitReportOptions,
421) -> LimitCurrentReport {
422    let (samples, duplicate_samples, ignored_inactive_stream_samples) =
423        normalized_derived_samples(&input.samples);
424    let current = build_current_windows(&samples, now);
425
426    LimitCurrentReport {
427        status: status_for_current(&current),
428        now,
429        start: input.start,
430        end: input.end,
431        sessions_dir: input.sessions_dir.clone(),
432        current,
433        diagnostics: diagnostics_for_options(
434            input,
435            duplicate_samples,
436            ignored_inactive_stream_samples,
437            0,
438            options,
439        ),
440    }
441}
442
443const UNKNOWN_ACCOUNT: &str = "unknown_account";
444const UNKNOWN_PLAN: &str = "unknown_plan";
445const UNKNOWN_LIMIT: &str = "unknown_limit";
446const RESET_KIND_FIRST_OBSERVED: &str = "firstObserved";
447const RESET_KIND_NORMAL: &str = "normal";
448const RESET_KIND_EARLY: &str = "early";
449const RESET_KIND_CHANGED: &str = "changed";
450const CURRENT_STATUS_ACTIVE: &str = "active";
451const CURRENT_STATUS_EXPIRED: &str = "expired";
452const TREND_KIND_INCREASED: &str = "increased";
453const TREND_KIND_DECREASED: &str = "decreased";
454const TREND_KIND_RESET_CHANGED: &str = "resetChanged";
455const RESET_JITTER_TOLERANCE_SECONDS: i64 = 60;
456const TREND_USED_DECREASE_NOISE_PERCENT: f64 = 1.0;
457const INACTIVE_STREAM_MIN_SAMPLES: usize = 3;
458const INACTIVE_STREAM_MIN_SPAN_SECONDS: i64 = 60;
459const PERCENT_EPSILON: f64 = 0.000_001;
460
461#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
462struct PartitionKey {
463    account_id: String,
464    plan_type: String,
465    limit_id: String,
466    window_minutes: i64,
467}
468
469#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
470struct SampleIdentity {
471    partition: PartitionKey,
472    timestamp: DateTime<Utc>,
473    resets_at: DateTime<Utc>,
474    window_minutes: i64,
475    limit_id: Option<String>,
476}
477
478#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
479struct TrendPartitionKey {
480    account_id: String,
481    plan_type: String,
482    limit_id: String,
483    window_minutes: i64,
484}
485
486#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
487struct TrendStreamKey {
488    account_id: String,
489    plan_type: String,
490    limit_id: String,
491}
492
493#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
494struct TrendObservationKey {
495    stream: TrendStreamKey,
496    timestamp: DateTime<Utc>,
497    session_id: String,
498    source_path: String,
499    source_line: usize,
500}
501
502#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
503struct TrendSourceKey {
504    stream: TrendStreamKey,
505    source: String,
506}
507
508#[derive(Clone, Debug)]
509struct TrendObservation {
510    stream: TrendStreamKey,
511    timestamp: DateTime<Utc>,
512    session_id: String,
513    source_path: String,
514    source_line: usize,
515    windows: BTreeMap<i64, RateLimitSample>,
516}
517
518#[derive(Clone, Debug)]
519struct WindowAccumulator {
520    partition: PartitionKey,
521    window: String,
522    reset_at: DateTime<Utc>,
523    first_seen: DateTime<Utc>,
524    last_seen: DateTime<Utc>,
525    min_used_percent: f64,
526    max_used_percent: f64,
527    last_used_percent: f64,
528    sample_count: i64,
529    reset_kind: &'static str,
530}
531
532impl WindowAccumulator {
533    fn new(sample: &RateLimitSample, reset_kind: &'static str) -> Self {
534        Self {
535            partition: partition_key(sample),
536            window: sample.window.clone(),
537            reset_at: sample.resets_at,
538            first_seen: sample.timestamp,
539            last_seen: sample.timestamp,
540            min_used_percent: sample.used_percent,
541            max_used_percent: sample.used_percent,
542            last_used_percent: sample.used_percent,
543            sample_count: 1,
544            reset_kind,
545        }
546    }
547
548    fn push(&mut self, sample: &RateLimitSample) {
549        self.reset_at = sample.resets_at;
550        self.last_seen = sample.timestamp;
551        self.min_used_percent = self.min_used_percent.min(sample.used_percent);
552        self.max_used_percent = self.max_used_percent.max(sample.used_percent);
553        self.last_used_percent = sample.used_percent;
554        self.sample_count += 1;
555    }
556
557    fn finish(self) -> LimitWindow {
558        let estimated_start = self
559            .reset_at
560            .checked_sub_signed(Duration::minutes(self.partition.window_minutes))
561            .unwrap_or(self.reset_at);
562        LimitWindow {
563            id: limit_window_id(&self.partition, self.reset_at, &self.window),
564            account_id: output_account_id(&self.partition),
565            plan_type: output_plan_type(&self.partition),
566            limit_id: output_limit_id(&self.partition),
567            window: self.window,
568            window_minutes: self.partition.window_minutes,
569            estimated_start,
570            reset_at: self.reset_at,
571            first_seen: self.first_seen,
572            last_seen: self.last_seen,
573            min_used_percent: self.min_used_percent,
574            max_used_percent: self.max_used_percent,
575            last_used_percent: self.last_used_percent,
576            sample_count: self.sample_count,
577            reset_kind: self.reset_kind.to_string(),
578        }
579    }
580}
581
582fn normalized_samples(samples: &[RateLimitSample]) -> (Vec<RateLimitSample>, i64) {
583    let mut sorted = samples.to_vec();
584    sorted.sort_by(|left, right| {
585        partition_key(left)
586            .cmp(&partition_key(right))
587            .then_with(|| left.timestamp.cmp(&right.timestamp))
588            .then_with(|| left.resets_at.cmp(&right.resets_at))
589            .then_with(|| left.window_minutes.cmp(&right.window_minutes))
590            .then_with(|| left.limit_id.cmp(&right.limit_id))
591            .then_with(|| left.session_id.cmp(&right.session_id))
592    });
593
594    let mut seen = BTreeSet::new();
595    let mut normalized = Vec::with_capacity(sorted.len());
596    let mut duplicate_samples = 0_i64;
597    for sample in sorted {
598        if seen.insert(sample_identity(&sample)) {
599            normalized.push(sample);
600        } else {
601            duplicate_samples += 1;
602        }
603    }
604
605    (normalized, duplicate_samples)
606}
607
608fn normalized_derived_samples(samples: &[RateLimitSample]) -> (Vec<RateLimitSample>, i64, i64) {
609    let (normalized, duplicate_samples) = normalized_samples(samples);
610    let (filtered, ignored_inactive_stream_samples) =
611        filter_inactive_rolling_zero_streams(normalized);
612    (filtered, duplicate_samples, ignored_inactive_stream_samples)
613}
614
615fn filter_inactive_rolling_zero_streams(
616    samples: Vec<RateLimitSample>,
617) -> (Vec<RateLimitSample>, i64) {
618    let inactive_partitions = inactive_rolling_zero_partitions(&samples);
619    if inactive_partitions.is_empty() {
620        return (samples, 0);
621    }
622
623    let mut ignored = 0_i64;
624    let mut filtered = Vec::with_capacity(samples.len());
625    for sample in samples {
626        if inactive_partitions.contains(&partition_key(&sample)) {
627            ignored += 1;
628        } else {
629            filtered.push(sample);
630        }
631    }
632    (filtered, ignored)
633}
634
635fn inactive_rolling_zero_partitions(samples: &[RateLimitSample]) -> BTreeSet<PartitionKey> {
636    let mut inactive = BTreeSet::new();
637    for (partition, partition_samples) in partitioned_samples(samples) {
638        if is_inactive_rolling_zero_stream(&partition_samples) {
639            inactive.insert(partition);
640        }
641    }
642    inactive
643}
644
645fn is_inactive_rolling_zero_stream(samples: &[&RateLimitSample]) -> bool {
646    if samples.len() < INACTIVE_STREAM_MIN_SAMPLES {
647        return false;
648    }
649
650    let first_seen = samples
651        .first()
652        .expect("inactive stream has first sample")
653        .timestamp;
654    let last_seen = samples
655        .last()
656        .expect("inactive stream has last sample")
657        .timestamp;
658    if (last_seen - first_seen).num_seconds() < INACTIVE_STREAM_MIN_SPAN_SECONDS {
659        return false;
660    }
661
662    samples.iter().all(|sample| {
663        sample.used_percent.abs() <= PERCENT_EPSILON && is_rolling_full_window_reset(sample)
664    })
665}
666
667fn is_rolling_full_window_reset(sample: &RateLimitSample) -> bool {
668    let Some(expected_reset) = sample
669        .timestamp
670        .checked_add_signed(Duration::minutes(sample.window_minutes))
671    else {
672        return false;
673    };
674    is_reset_time_equal_within_jitter(expected_reset, sample.resets_at)
675}
676
677fn build_windows(samples: &[RateLimitSample]) -> Vec<LimitWindow> {
678    let mut windows = Vec::new();
679    for (_, partition_samples) in partitioned_samples(samples) {
680        let mut current: Option<WindowAccumulator> = None;
681        let mut previous_sample: Option<RateLimitSample> = None;
682
683        for sample in partition_samples {
684            let reset_kind = previous_sample
685                .as_ref()
686                .map(|previous| transition_kind(previous, sample))
687                .unwrap_or(RESET_KIND_FIRST_OBSERVED);
688            match current.as_mut() {
689                Some(window)
690                    if is_reset_time_equal_within_jitter(window.reset_at, sample.resets_at) =>
691                {
692                    window.push(sample);
693                }
694                Some(_) => {
695                    windows.push(current.take().expect("window exists").finish());
696                    current = Some(WindowAccumulator::new(sample, reset_kind));
697                }
698                None => current = Some(WindowAccumulator::new(sample, RESET_KIND_FIRST_OBSERVED)),
699            }
700            previous_sample = Some((*sample).clone());
701        }
702
703        if let Some(window) = current {
704            windows.push(window.finish());
705        }
706    }
707
708    windows.sort_by(|left, right| {
709        left.reset_at
710            .cmp(&right.reset_at)
711            .then_with(|| left.first_seen.cmp(&right.first_seen))
712            .then_with(|| left.window_minutes.cmp(&right.window_minutes))
713            .then_with(|| left.account_id.cmp(&right.account_id))
714            .then_with(|| left.plan_type.cmp(&right.plan_type))
715            .then_with(|| left.limit_id.cmp(&right.limit_id))
716    });
717    windows
718}
719
720fn build_resets(samples: &[RateLimitSample]) -> Vec<LimitResetEvent> {
721    let mut events = Vec::new();
722    for (partition, partition_samples) in partitioned_samples(samples) {
723        for pair in partition_samples.windows(2) {
724            let previous = pair[0];
725            let next = pair[1];
726            if !is_reset_transition(previous, next) {
727                continue;
728            }
729            let kind = transition_kind(previous, next);
730            let early_by_seconds = (previous.resets_at - next.timestamp).num_seconds().max(0);
731            events.push(LimitResetEvent {
732                at: next.timestamp,
733                account_id: output_account_id(&partition),
734                plan_type: output_plan_type(&partition),
735                limit_id: output_limit_id(&partition),
736                window: next.window.clone(),
737                window_minutes: next.window_minutes,
738                previous_used_percent: previous.used_percent,
739                next_used_percent: next.used_percent,
740                previous_resets_at: previous.resets_at,
741                next_resets_at: next.resets_at,
742                early_by_seconds,
743                kind: kind.to_string(),
744            });
745        }
746    }
747
748    events.sort_by(|left, right| {
749        left.at
750            .cmp(&right.at)
751            .then_with(|| left.window_minutes.cmp(&right.window_minutes))
752            .then_with(|| left.account_id.cmp(&right.account_id))
753            .then_with(|| left.plan_type.cmp(&right.plan_type))
754    });
755    events
756}
757
758fn build_trend_changes(
759    samples: &[RateLimitSample],
760    window_minutes: Option<i64>,
761) -> Vec<LimitTrendChange> {
762    let mut changes = Vec::new();
763    let observations = compact_trend_observations_by_source(trend_observations(samples));
764    for (_, mut stream_observations) in trend_stream_observations(observations) {
765        stream_observations.sort_by(compare_trend_observation_order);
766        let mut state_by_window = BTreeMap::<i64, RateLimitSample>::new();
767
768        for observation in stream_observations {
769            let mut accepted_windows = BTreeMap::<i64, &'static str>::new();
770            for (window, sample) in &observation.windows {
771                if !is_active_trend_sample(sample) {
772                    continue;
773                }
774                let kind = match state_by_window.get(window) {
775                    Some(previous) => trend_window_change_kind(previous, sample),
776                    None => Some(RESET_KIND_FIRST_OBSERVED),
777                };
778                if let Some(kind) = kind {
779                    accepted_windows.insert(*window, kind);
780                }
781            }
782
783            if accepted_windows.is_empty() {
784                continue;
785            }
786
787            let output_windows = trend_output_windows(&observation, window_minutes);
788            for window in output_windows {
789                let Some(sample) = observation.windows.get(&window) else {
790                    continue;
791                };
792                if !is_active_trend_sample(sample) {
793                    continue;
794                }
795
796                let previous = state_by_window.get(&window);
797                if let Some(kind) = accepted_windows.get(&window) {
798                    changes.push(trend_change_from_sample(
799                        sample,
800                        previous.map(|previous| sample.used_percent - previous.used_percent),
801                        kind,
802                    ));
803                    state_by_window.insert(window, sample.clone());
804                }
805            }
806
807            for window in accepted_windows.keys() {
808                if let Some(sample) = observation.windows.get(window) {
809                    state_by_window.insert(*window, sample.clone());
810                }
811            }
812        }
813    }
814
815    changes.sort_by(|left, right| {
816        left.at
817            .cmp(&right.at)
818            .then_with(|| left.window_minutes.cmp(&right.window_minutes))
819            .then_with(|| left.account_id.cmp(&right.account_id))
820            .then_with(|| left.plan_type.cmp(&right.plan_type))
821            .then_with(|| left.limit_id.cmp(&right.limit_id))
822    });
823    changes
824}
825
826fn build_current_windows(
827    samples: &[RateLimitSample],
828    now: DateTime<Utc>,
829) -> Vec<LimitCurrentWindow> {
830    let mut windows_by_partition = BTreeMap::<
831        (i64, Option<String>, Option<String>, Option<String>, String),
832        Vec<LimitWindow>,
833    >::new();
834    for window in build_windows(samples)
835        .into_iter()
836        .filter(|window| window.first_seen <= now)
837    {
838        windows_by_partition
839            .entry((
840                window.window_minutes,
841                window.account_id.clone(),
842                window.plan_type.clone(),
843                window.limit_id.clone(),
844                window.window.clone(),
845            ))
846            .or_default()
847            .push(window);
848    }
849
850    let mut current = Vec::new();
851    for (_, mut partition_windows) in windows_by_partition {
852        partition_windows.sort_by(compare_limit_window_order);
853
854        if let Some(window) = partition_windows.last() {
855            current.push(limit_current_from_window(window, now));
856        }
857    }
858
859    current.sort_by(|left, right| {
860        left.window_minutes
861            .cmp(&right.window_minutes)
862            .then_with(|| left.account_id.cmp(&right.account_id))
863            .then_with(|| left.plan_type.cmp(&right.plan_type))
864            .then_with(|| left.limit_id.cmp(&right.limit_id))
865            .then_with(|| left.status.cmp(&right.status))
866            .then_with(|| left.resets_at.cmp(&right.resets_at))
867            .then_with(|| left.last_seen.cmp(&right.last_seen))
868    });
869    current
870}
871
872fn compare_limit_window_order(left: &LimitWindow, right: &LimitWindow) -> std::cmp::Ordering {
873    left.reset_at
874        .cmp(&right.reset_at)
875        .then_with(|| left.last_seen.cmp(&right.last_seen))
876        .then_with(|| left.first_seen.cmp(&right.first_seen))
877}
878
879fn limit_current_from_window(window: &LimitWindow, now: DateTime<Utc>) -> LimitCurrentWindow {
880    let active = window.reset_at > now;
881    LimitCurrentWindow {
882        id: format!("{}-current", window.id),
883        status: if active {
884            CURRENT_STATUS_ACTIVE
885        } else {
886            CURRENT_STATUS_EXPIRED
887        }
888        .to_string(),
889        account_id: window.account_id.clone(),
890        plan_type: window.plan_type.clone(),
891        limit_id: window.limit_id.clone(),
892        window: window.window.clone(),
893        window_minutes: window.window_minutes,
894        last_seen: Some(window.last_seen),
895        used_percent: Some(window.last_used_percent),
896        remaining_percent: Some(100.0 - window.last_used_percent),
897        resets_at: Some(window.reset_at),
898        reset_in_seconds: active.then_some((window.reset_at - now).num_seconds()),
899    }
900}
901
902fn partitioned_samples(
903    samples: &[RateLimitSample],
904) -> BTreeMap<PartitionKey, Vec<&RateLimitSample>> {
905    let mut partitions: BTreeMap<PartitionKey, Vec<&RateLimitSample>> = BTreeMap::new();
906    for sample in samples {
907        partitions
908            .entry(partition_key(sample))
909            .or_default()
910            .push(sample);
911    }
912    for partition_samples in partitions.values_mut() {
913        partition_samples.sort_by(|left, right| compare_sample_order(left, right));
914    }
915    partitions
916}
917
918fn trend_observations(samples: &[RateLimitSample]) -> Vec<TrendObservation> {
919    let mut observations = BTreeMap::<TrendObservationKey, TrendObservation>::new();
920    for sample in samples {
921        let key = trend_observation_key(sample);
922        observations
923            .entry(key.clone())
924            .or_insert_with(|| TrendObservation {
925                stream: key.stream.clone(),
926                timestamp: key.timestamp,
927                session_id: key.session_id.clone(),
928                source_path: key.source_path.clone(),
929                source_line: key.source_line,
930                windows: BTreeMap::new(),
931            })
932            .windows
933            .insert(sample.window_minutes, sample.clone());
934    }
935
936    observations.into_values().collect()
937}
938
939fn compact_trend_observations_by_source(
940    observations: Vec<TrendObservation>,
941) -> Vec<TrendObservation> {
942    let mut by_source = BTreeMap::<TrendSourceKey, Vec<TrendObservation>>::new();
943    for observation in observations {
944        by_source
945            .entry(TrendSourceKey {
946                stream: observation.stream.clone(),
947                source: trend_observation_source(&observation),
948            })
949            .or_default()
950            .push(observation);
951    }
952
953    let mut compacted = Vec::new();
954    for observations in by_source.values_mut() {
955        observations.sort_by(compare_trend_observation_order);
956        let mut previous: Option<&TrendObservation> = None;
957        for observation in observations.iter() {
958            if previous
959                .is_some_and(|previous| trend_observation_vector_equal(previous, observation))
960            {
961                continue;
962            }
963            compacted.push(observation.clone());
964            previous = Some(observation);
965        }
966    }
967
968    compacted
969}
970
971fn trend_stream_observations(
972    observations: Vec<TrendObservation>,
973) -> BTreeMap<TrendStreamKey, Vec<TrendObservation>> {
974    let mut by_stream = BTreeMap::<TrendStreamKey, Vec<TrendObservation>>::new();
975    for observation in observations {
976        by_stream
977            .entry(observation.stream.clone())
978            .or_default()
979            .push(observation);
980    }
981    by_stream
982}
983
984fn trend_observation_key(sample: &RateLimitSample) -> TrendObservationKey {
985    let (source_path, source_line) = sample
986        .source
987        .as_ref()
988        .map(|source| (source.path.clone(), source.line_number))
989        .unwrap_or_else(|| (String::new(), 0));
990    TrendObservationKey {
991        stream: trend_stream_key(sample),
992        timestamp: sample.timestamp,
993        session_id: sample.session_id.clone(),
994        source_path,
995        source_line,
996    }
997}
998
999fn trend_observation_source(observation: &TrendObservation) -> String {
1000    if observation.source_path.is_empty() {
1001        observation.session_id.clone()
1002    } else {
1003        observation.source_path.clone()
1004    }
1005}
1006
1007fn compare_trend_observation_order(
1008    left: &TrendObservation,
1009    right: &TrendObservation,
1010) -> std::cmp::Ordering {
1011    left.timestamp
1012        .cmp(&right.timestamp)
1013        .then_with(|| left.source_path.cmp(&right.source_path))
1014        .then_with(|| left.source_line.cmp(&right.source_line))
1015        .then_with(|| left.session_id.cmp(&right.session_id))
1016}
1017
1018fn trend_observation_vector_equal(left: &TrendObservation, right: &TrendObservation) -> bool {
1019    if left.windows.len() != right.windows.len() {
1020        return false;
1021    }
1022
1023    left.windows.iter().all(|(window, left_sample)| {
1024        right.windows.get(window).is_some_and(|right_sample| {
1025            left_sample.used_percent == right_sample.used_percent
1026                && left_sample.remaining_percent == right_sample.remaining_percent
1027                && left_sample.resets_at == right_sample.resets_at
1028        })
1029    })
1030}
1031
1032fn trend_output_windows(observation: &TrendObservation, window_minutes: Option<i64>) -> Vec<i64> {
1033    match window_minutes {
1034        Some(window) => vec![window],
1035        None => observation.windows.keys().copied().collect(),
1036    }
1037}
1038
1039fn compare_sample_order(left: &RateLimitSample, right: &RateLimitSample) -> std::cmp::Ordering {
1040    left.timestamp
1041        .cmp(&right.timestamp)
1042        .then_with(|| left.resets_at.cmp(&right.resets_at))
1043        .then_with(|| left.window_minutes.cmp(&right.window_minutes))
1044        .then_with(|| left.limit_id.cmp(&right.limit_id))
1045        .then_with(|| left.session_id.cmp(&right.session_id))
1046}
1047
1048fn sample_identity(sample: &RateLimitSample) -> SampleIdentity {
1049    SampleIdentity {
1050        partition: partition_key(sample),
1051        timestamp: sample.timestamp,
1052        resets_at: sample.resets_at,
1053        window_minutes: sample.window_minutes,
1054        limit_id: sample.limit_id.clone(),
1055    }
1056}
1057
1058fn trend_partition_key(sample: &RateLimitSample) -> TrendPartitionKey {
1059    TrendPartitionKey {
1060        account_id: sample
1061            .account_id
1062            .clone()
1063            .unwrap_or_else(|| UNKNOWN_ACCOUNT.to_string()),
1064        plan_type: sample
1065            .plan_type
1066            .clone()
1067            .unwrap_or_else(|| UNKNOWN_PLAN.to_string()),
1068        limit_id: sample
1069            .limit_id
1070            .clone()
1071            .unwrap_or_else(|| UNKNOWN_LIMIT.to_string()),
1072        window_minutes: sample.window_minutes,
1073    }
1074}
1075
1076fn trend_stream_key(sample: &RateLimitSample) -> TrendStreamKey {
1077    TrendStreamKey {
1078        account_id: sample
1079            .account_id
1080            .clone()
1081            .unwrap_or_else(|| UNKNOWN_ACCOUNT.to_string()),
1082        plan_type: sample
1083            .plan_type
1084            .clone()
1085            .unwrap_or_else(|| UNKNOWN_PLAN.to_string()),
1086        limit_id: sample
1087            .limit_id
1088            .clone()
1089            .unwrap_or_else(|| UNKNOWN_LIMIT.to_string()),
1090    }
1091}
1092
1093fn partition_key(sample: &RateLimitSample) -> PartitionKey {
1094    PartitionKey {
1095        account_id: sample
1096            .account_id
1097            .clone()
1098            .unwrap_or_else(|| UNKNOWN_ACCOUNT.to_string()),
1099        plan_type: sample
1100            .plan_type
1101            .clone()
1102            .unwrap_or_else(|| UNKNOWN_PLAN.to_string()),
1103        limit_id: sample
1104            .limit_id
1105            .clone()
1106            .unwrap_or_else(|| UNKNOWN_LIMIT.to_string()),
1107        window_minutes: sample.window_minutes,
1108    }
1109}
1110
1111fn output_account_id(partition: &PartitionKey) -> Option<String> {
1112    (partition.account_id != UNKNOWN_ACCOUNT).then(|| partition.account_id.clone())
1113}
1114
1115fn output_plan_type(partition: &PartitionKey) -> Option<String> {
1116    (partition.plan_type != UNKNOWN_PLAN).then(|| partition.plan_type.clone())
1117}
1118
1119fn output_limit_id(partition: &PartitionKey) -> Option<String> {
1120    (partition.limit_id != UNKNOWN_LIMIT).then(|| partition.limit_id.clone())
1121}
1122
1123fn output_trend_account_id(partition: &TrendPartitionKey) -> Option<String> {
1124    (partition.account_id != UNKNOWN_ACCOUNT).then(|| partition.account_id.clone())
1125}
1126
1127fn output_trend_plan_type(partition: &TrendPartitionKey) -> Option<String> {
1128    (partition.plan_type != UNKNOWN_PLAN).then(|| partition.plan_type.clone())
1129}
1130
1131fn output_trend_limit_id(partition: &TrendPartitionKey) -> Option<String> {
1132    (partition.limit_id != UNKNOWN_LIMIT).then(|| partition.limit_id.clone())
1133}
1134
1135fn trend_change_from_sample(
1136    sample: &RateLimitSample,
1137    delta_used_percent: Option<f64>,
1138    kind: &str,
1139) -> LimitTrendChange {
1140    let partition = trend_partition_key(sample);
1141    LimitTrendChange {
1142        at: sample.timestamp,
1143        account_id: output_trend_account_id(&partition),
1144        plan_type: output_trend_plan_type(&partition),
1145        limit_id: output_trend_limit_id(&partition),
1146        window: sample.window.clone(),
1147        window_minutes: sample.window_minutes,
1148        used_percent: sample.used_percent,
1149        remaining_percent: sample.remaining_percent,
1150        delta_used_percent,
1151        resets_at: sample.resets_at,
1152        kind: kind.to_string(),
1153    }
1154}
1155
1156fn trend_window_change_kind(
1157    previous: &RateLimitSample,
1158    next: &RateLimitSample,
1159) -> Option<&'static str> {
1160    let used_delta = next.used_percent - previous.used_percent;
1161    if used_delta > PERCENT_EPSILON {
1162        Some(TREND_KIND_INCREASED)
1163    } else if used_delta < -PERCENT_EPSILON {
1164        if is_reset_jitter_change(previous, next)
1165            || used_delta.abs() <= TREND_USED_DECREASE_NOISE_PERCENT + PERCENT_EPSILON
1166        {
1167            None
1168        } else {
1169            Some(TREND_KIND_DECREASED)
1170        }
1171    } else if is_significant_reset_change(previous, next) {
1172        Some(TREND_KIND_RESET_CHANGED)
1173    } else {
1174        None
1175    }
1176}
1177
1178fn is_reset_jitter_change(previous: &RateLimitSample, next: &RateLimitSample) -> bool {
1179    previous.resets_at != next.resets_at && is_reset_equal_for_trend(previous, next)
1180}
1181
1182fn is_active_trend_sample(sample: &RateLimitSample) -> bool {
1183    sample.resets_at > sample.timestamp
1184}
1185
1186fn is_significant_reset_change(previous: &RateLimitSample, next: &RateLimitSample) -> bool {
1187    previous.resets_at != next.resets_at && !is_reset_equal_for_trend(previous, next)
1188}
1189
1190fn is_reset_equal_for_trend(previous: &RateLimitSample, next: &RateLimitSample) -> bool {
1191    is_reset_time_equal_within_jitter(previous.resets_at, next.resets_at)
1192}
1193
1194fn is_reset_time_equal_within_jitter(left: DateTime<Utc>, right: DateTime<Utc>) -> bool {
1195    (right - left).num_seconds().abs() <= RESET_JITTER_TOLERANCE_SECONDS
1196}
1197
1198fn is_reset_transition(previous: &RateLimitSample, next: &RateLimitSample) -> bool {
1199    next.resets_at > next.timestamp
1200        && previous.resets_at != next.resets_at
1201        && !is_reset_equal_for_trend(previous, next)
1202        && next.used_percent < previous.used_percent
1203}
1204
1205fn transition_kind(previous: &RateLimitSample, next: &RateLimitSample) -> &'static str {
1206    if is_reset_transition(previous, next) {
1207        if next.timestamp < previous.resets_at {
1208            RESET_KIND_EARLY
1209        } else {
1210            RESET_KIND_NORMAL
1211        }
1212    } else if previous.resets_at != next.resets_at {
1213        RESET_KIND_CHANGED
1214    } else {
1215        RESET_KIND_FIRST_OBSERVED
1216    }
1217}
1218
1219fn limit_window_id(partition: &PartitionKey, reset_at: DateTime<Utc>, window: &str) -> String {
1220    format!(
1221        "{}-{}-{}-{}-reset-{}",
1222        sanitize_id_part(window),
1223        sanitize_id_part(&partition.account_id),
1224        sanitize_id_part(&partition.plan_type),
1225        sanitize_id_part(&partition.limit_id),
1226        reset_at.timestamp()
1227    )
1228}
1229
1230fn sanitize_id_part(value: &str) -> String {
1231    let sanitized = value
1232        .chars()
1233        .map(|char| {
1234            if char.is_ascii_alphanumeric() {
1235                char.to_ascii_lowercase()
1236            } else {
1237                '-'
1238            }
1239        })
1240        .collect::<String>()
1241        .trim_matches('-')
1242        .to_string();
1243
1244    if sanitized.is_empty() {
1245        "unknown".to_string()
1246    } else {
1247        sanitized
1248    }
1249}
1250
1251fn status_for_count(count: usize) -> String {
1252    if count == 0 {
1253        "unobserved".to_string()
1254    } else {
1255        "ok".to_string()
1256    }
1257}
1258
1259fn status_for_current(current: &[LimitCurrentWindow]) -> String {
1260    if current.is_empty() {
1261        "unobserved".to_string()
1262    } else if current
1263        .iter()
1264        .any(|window| window.status == CURRENT_STATUS_ACTIVE)
1265    {
1266        "ok".to_string()
1267    } else {
1268        CURRENT_STATUS_EXPIRED.to_string()
1269    }
1270}
1271
1272fn count_unknown_limit_samples(samples: &[RateLimitSample]) -> i64 {
1273    samples
1274        .iter()
1275        .filter(|sample| sample.limit_id.is_none())
1276        .count() as i64
1277}
1278
1279fn count_unknown_limit_reset_events(resets: &[LimitResetEvent]) -> i64 {
1280    resets
1281        .iter()
1282        .filter(|reset| reset.limit_id.is_none())
1283        .count() as i64
1284}
1285
1286fn diagnostics_for_options(
1287    input: &RateLimitSamplesReport,
1288    duplicate_samples: i64,
1289    ignored_inactive_stream_samples: i64,
1290    unknown_limit_reset_events: i64,
1291    options: LimitReportOptions,
1292) -> Option<LimitReportDiagnostics> {
1293    if !options.include_diagnostics {
1294        return None;
1295    }
1296
1297    let mut scan = input.diagnostics.clone();
1298    let source_evidence = if options.include_source_evidence {
1299        scan.source_spans
1300            .iter()
1301            .map(|source| LimitSourceEvidence {
1302                path: source.path.clone(),
1303                line_number: source.line_number,
1304            })
1305            .collect()
1306    } else {
1307        Vec::new()
1308    };
1309    scan.source_spans.clear();
1310
1311    Some(LimitReportDiagnostics {
1312        scan,
1313        duplicate_samples,
1314        unknown_limit_samples: count_unknown_limit_samples(&input.samples),
1315        unknown_limit_reset_events,
1316        ignored_inactive_stream_samples,
1317        source_evidence,
1318    })
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323    use super::*;
1324    use crate::limits::read_rate_limit_samples_report;
1325    use chrono::TimeZone;
1326    use serde_json::Value;
1327    use std::path::PathBuf;
1328
1329    #[test]
1330    fn builds_windows_without_crossing_account_plan_or_window_partitions() {
1331        let input = fixture_samples_report();
1332
1333        let report = build_limit_windows_report(&input, LimitReportOptions::default());
1334
1335        assert_eq!(report.status, "ok");
1336        assert_eq!(report.windows.len(), 9);
1337        assert!(report
1338            .windows
1339            .windows(2)
1340            .all(|pair| pair[0].reset_at <= pair[1].reset_at));
1341
1342        let first_primary = report
1343            .windows
1344            .iter()
1345            .find(|window| {
1346                window.window_minutes == 300
1347                    && window.account_id.as_deref() == Some("account-fixture")
1348                    && window.plan_type.as_deref() == Some("pro")
1349                    && window.reset_at == utc_time(2026, 5, 10, 14, 0)
1350            })
1351            .expect("first primary window");
1352        assert_eq!(first_primary.window, "5h");
1353        assert_eq!(first_primary.estimated_start, utc_time(2026, 5, 10, 9, 0));
1354        assert_eq!(first_primary.sample_count, 1);
1355        assert_eq!(first_primary.reset_kind, RESET_KIND_FIRST_OBSERVED);
1356
1357        let weekly_early = report
1358            .windows
1359            .iter()
1360            .find(|window| {
1361                window.window_minutes == 10080
1362                    && window.account_id.as_deref() == Some("account-fixture")
1363                    && window.plan_type.as_deref() == Some("pro")
1364                    && window.reset_at == utc_time(2026, 5, 19, 9, 0)
1365            })
1366            .expect("early weekly window");
1367        assert_eq!(weekly_early.reset_kind, RESET_KIND_EARLY);
1368        assert_eq!(weekly_early.min_used_percent, 4.0);
1369        assert_eq!(weekly_early.max_used_percent, 4.0);
1370
1371        let plus_window = report
1372            .windows
1373            .iter()
1374            .find(|window| window.account_id.as_deref() == Some("account-other"))
1375            .expect("plus account window");
1376        assert_eq!(plus_window.plan_type.as_deref(), Some("plus"));
1377        assert_eq!(plus_window.window_minutes, 300);
1378    }
1379
1380    #[test]
1381    fn windows_merge_reset_jitter_into_one_logical_window() {
1382        let first_reset = utc_time(2026, 5, 12, 17, 0);
1383        let next_reset = utc_time(2026, 5, 12, 18, 0);
1384        let input = RateLimitSamplesReport {
1385            start: utc_time(2026, 5, 12, 12, 0),
1386            end: utc_time(2026, 5, 12, 13, 0),
1387            sessions_dir: "/tmp/sessions".to_string(),
1388            samples: vec![
1389                trend_sample(0.0, first_reset, 0),
1390                trend_sample(1.0, first_reset + Duration::seconds(1), 1),
1391                trend_sample(0.0, first_reset, 2),
1392                trend_sample(2.0, first_reset + Duration::seconds(30), 3),
1393                trend_sample(0.0, next_reset, 4),
1394            ],
1395            diagnostics: RateLimitDiagnostics::default(),
1396        };
1397
1398        let report = build_limit_windows_report(&input, LimitReportOptions::default());
1399
1400        assert_eq!(report.windows.len(), 2);
1401        let jittered = &report.windows[0];
1402        assert_eq!(jittered.reset_at, first_reset + Duration::seconds(30));
1403        assert_eq!(jittered.sample_count, 4);
1404        assert_eq!(jittered.min_used_percent, 0.0);
1405        assert_eq!(jittered.max_used_percent, 2.0);
1406        assert_eq!(jittered.last_used_percent, 2.0);
1407        assert_eq!(jittered.reset_kind, RESET_KIND_FIRST_OBSERVED);
1408        assert_eq!(report.windows[1].reset_kind, RESET_KIND_EARLY);
1409    }
1410
1411    #[test]
1412    fn derived_reports_ignore_inactive_rolling_zero_streams() {
1413        let first_seen = utc_time(2026, 5, 5, 13, 0);
1414        let window_minutes = 10_080;
1415        let codex_reset = utc_time(2026, 5, 12, 7, 15);
1416        let rolling_reset = |offset_minutes: i64, jitter_seconds: i64| {
1417            first_seen
1418                + Duration::minutes(offset_minutes)
1419                + Duration::minutes(window_minutes)
1420                + Duration::seconds(jitter_seconds)
1421        };
1422        let input = RateLimitSamplesReport {
1423            start: first_seen,
1424            end: first_seen + Duration::minutes(10),
1425            sessions_dir: "/tmp/sessions".to_string(),
1426            samples: vec![
1427                report_sample("codex", first_seen, 4.0, codex_reset, window_minutes),
1428                report_sample(
1429                    "codex",
1430                    first_seen + Duration::minutes(4),
1431                    4.0,
1432                    codex_reset,
1433                    window_minutes,
1434                ),
1435                report_sample(
1436                    "codex_bengalfox",
1437                    first_seen,
1438                    0.0,
1439                    rolling_reset(0, -3),
1440                    window_minutes,
1441                ),
1442                report_sample(
1443                    "codex_bengalfox",
1444                    first_seen + Duration::minutes(2),
1445                    0.0,
1446                    rolling_reset(2, -1),
1447                    window_minutes,
1448                ),
1449                report_sample(
1450                    "codex_bengalfox",
1451                    first_seen + Duration::minutes(4),
1452                    0.0,
1453                    rolling_reset(4, 0),
1454                    window_minutes,
1455                ),
1456            ],
1457            diagnostics: RateLimitDiagnostics::default(),
1458        };
1459
1460        let samples = build_limit_samples_report(&input, LimitReportOptions::default());
1461        assert!(samples
1462            .samples
1463            .iter()
1464            .any(|sample| sample.limit_id.as_deref() == Some("codex_bengalfox")));
1465
1466        let windows = build_limit_windows_report(
1467            &input,
1468            LimitReportOptions {
1469                include_diagnostics: true,
1470                include_source_evidence: false,
1471            },
1472        );
1473        assert_eq!(windows.windows.len(), 1);
1474        assert_eq!(windows.windows[0].limit_id.as_deref(), Some("codex"));
1475        assert_eq!(
1476            windows
1477                .diagnostics
1478                .as_ref()
1479                .expect("windows diagnostics")
1480                .ignored_inactive_stream_samples,
1481            3
1482        );
1483
1484        let trend =
1485            build_limit_trend_report(&input, Some(window_minutes), LimitReportOptions::default());
1486        assert_eq!(trend.changes.len(), 1);
1487        assert_eq!(trend.changes[0].limit_id.as_deref(), Some("codex"));
1488
1489        let current = build_limit_current_report(
1490            &input,
1491            first_seen + Duration::minutes(5),
1492            LimitReportOptions::default(),
1493        );
1494        assert_eq!(current.current.len(), 1);
1495        assert_eq!(current.current[0].limit_id.as_deref(), Some("codex"));
1496    }
1497
1498    #[test]
1499    fn detects_normal_and_early_resets_within_each_partition() {
1500        let input = fixture_samples_report();
1501
1502        let report = build_limit_resets_report(&input, false, LimitReportOptions::default());
1503
1504        assert_eq!(report.status, "ok");
1505        assert_eq!(report.resets.len(), 4);
1506        assert!(report
1507            .resets
1508            .iter()
1509            .all(|reset| reset.account_id.as_deref() == Some("account-fixture")));
1510        assert!(report
1511            .resets
1512            .iter()
1513            .any(|reset| reset.window == "7d" && reset.kind == RESET_KIND_NORMAL));
1514
1515        let early_weekly = report
1516            .resets
1517            .iter()
1518            .find(|reset| reset.window == "7d" && reset.kind == RESET_KIND_EARLY)
1519            .expect("early weekly reset");
1520        assert_eq!(early_weekly.at, utc_time(2026, 5, 12, 12, 0));
1521        assert_eq!(early_weekly.previous_used_percent, 91.0);
1522        assert_eq!(early_weekly.next_used_percent, 4.0);
1523        assert_eq!(early_weekly.previous_resets_at, utc_time(2026, 5, 18, 9, 0));
1524        assert_eq!(early_weekly.next_resets_at, utc_time(2026, 5, 19, 9, 0));
1525        assert_eq!(early_weekly.early_by_seconds, 507_600);
1526
1527        let early_only = build_limit_resets_report(&input, true, LimitReportOptions::default());
1528        assert_eq!(early_only.resets.len(), 2);
1529        assert!(early_only
1530            .resets
1531            .iter()
1532            .all(|reset| reset.kind == RESET_KIND_EARLY));
1533    }
1534
1535    #[test]
1536    fn resets_do_not_cross_limit_id_streams() {
1537        let input = RateLimitSamplesReport {
1538            start: utc_time(2026, 5, 12, 0, 0),
1539            end: utc_time(2026, 5, 12, 2, 0),
1540            sessions_dir: "/tmp/sessions".to_string(),
1541            samples: vec![
1542                reset_sample("limit-alpha", 0, 80.0, utc_time(2026, 5, 19, 0, 0)),
1543                reset_sample("limit-beta", 1, 4.0, utc_time(2026, 5, 20, 0, 0)),
1544                reset_sample("limit-alpha", 2, 82.0, utc_time(2026, 5, 19, 0, 0)),
1545                reset_sample("limit-alpha", 3, 2.0, utc_time(2026, 5, 20, 0, 0)),
1546                reset_sample("limit-beta", 4, 5.0, utc_time(2026, 5, 20, 0, 0)),
1547            ],
1548            diagnostics: RateLimitDiagnostics::default(),
1549        };
1550
1551        let report = build_limit_resets_report(&input, false, LimitReportOptions::default());
1552
1553        assert_eq!(report.resets.len(), 1);
1554        let reset = &report.resets[0];
1555        assert_eq!(reset.limit_id.as_deref(), Some("limit-alpha"));
1556        assert_eq!(reset.previous_used_percent, 82.0);
1557        assert_eq!(reset.next_used_percent, 2.0);
1558        assert_eq!(reset.previous_resets_at, utc_time(2026, 5, 19, 0, 0));
1559        assert_eq!(reset.next_resets_at, utc_time(2026, 5, 20, 0, 0));
1560    }
1561
1562    #[test]
1563    fn reset_diagnostics_count_unknown_limit_risk() {
1564        let input = RateLimitSamplesReport {
1565            start: utc_time(2026, 5, 12, 0, 0),
1566            end: utc_time(2026, 5, 12, 2, 0),
1567            sessions_dir: "/tmp/sessions".to_string(),
1568            samples: vec![
1569                unknown_limit_reset_sample(0, 40.0, utc_time(2026, 5, 19, 0, 0)),
1570                unknown_limit_reset_sample(1, 4.0, utc_time(2026, 5, 20, 0, 0)),
1571            ],
1572            diagnostics: RateLimitDiagnostics::default(),
1573        };
1574
1575        let report = build_limit_resets_report(
1576            &input,
1577            false,
1578            LimitReportOptions {
1579                include_diagnostics: true,
1580                include_source_evidence: false,
1581            },
1582        );
1583        let diagnostics = report.diagnostics.expect("diagnostics");
1584
1585        assert_eq!(report.resets.len(), 1);
1586        assert_eq!(report.resets[0].limit_id, None);
1587        assert_eq!(diagnostics.unknown_limit_samples, 2);
1588        assert_eq!(diagnostics.unknown_limit_reset_events, 1);
1589    }
1590
1591    #[test]
1592    fn builds_trend_change_points_and_compresses_duplicates() {
1593        let input = trend_samples_report();
1594
1595        let report = build_limit_trend_report(&input, Some(300), LimitReportOptions::default());
1596
1597        assert_eq!(report.status, "ok");
1598        assert_eq!(report.changes.len(), 4);
1599        assert!(report
1600            .changes
1601            .iter()
1602            .all(|change| change.used_percent != 24.0));
1603        assert!(report
1604            .changes
1605            .iter()
1606            .all(|change| change.resets_at != utc_time(2026, 5, 12, 17, 0) + Duration::seconds(1)));
1607        assert_eq!(report.changes[0].kind, RESET_KIND_FIRST_OBSERVED);
1608        assert_eq!(report.changes[0].used_percent, 20.0);
1609        assert_eq!(report.changes[0].delta_used_percent, None);
1610        assert_eq!(report.changes[1].kind, TREND_KIND_INCREASED);
1611        assert_eq!(report.changes[1].used_percent, 25.0);
1612        assert_eq!(report.changes[1].delta_used_percent, Some(5.0));
1613        assert_eq!(report.changes[2].kind, TREND_KIND_DECREASED);
1614        assert_eq!(report.changes[2].used_percent, 15.0);
1615        assert_eq!(report.changes[2].delta_used_percent, Some(-10.0));
1616        assert_eq!(report.changes[3].kind, TREND_KIND_RESET_CHANGED);
1617        assert_eq!(report.changes[3].used_percent, 15.0);
1618        assert_eq!(report.changes[3].delta_used_percent, Some(0.0));
1619        assert_eq!(report.changes[3].resets_at, utc_time(2026, 5, 12, 18, 0));
1620        assert!(report
1621            .changes
1622            .iter()
1623            .all(|change| change.limit_id.as_deref() == Some("fixture-trend-change")));
1624    }
1625
1626    #[test]
1627    fn trend_uses_monotonic_window_progress_across_parallel_sources() {
1628        let reset_zero = utc_time(2026, 5, 18, 5, 12);
1629        let reset_progress = reset_zero + Duration::seconds(11);
1630        let expired_reset = utc_time(2026, 5, 17, 18, 30);
1631        let input = RateLimitSamplesReport {
1632            start: utc_time(2026, 5, 18, 0, 0),
1633            end: utc_time(2026, 5, 18, 1, 0),
1634            sessions_dir: "/tmp/sessions".to_string(),
1635            samples: vec![
1636                trend_window_sample("stale", 1, 0, 0.0, reset_zero, 300),
1637                trend_window_sample("progress", 1, 1, 1.0, reset_progress, 300),
1638                trend_window_sample("stale", 2, 2, 0.0, reset_zero, 300),
1639                trend_window_sample("progress", 2, 3, 2.0, reset_progress, 300),
1640                trend_window_sample("expired", 1, 4, 18.0, expired_reset, 300),
1641                trend_window_sample("stale", 3, 5, 0.0, reset_zero, 300),
1642                trend_window_sample("progress", 3, 6, 4.0, reset_progress, 300),
1643            ],
1644            diagnostics: RateLimitDiagnostics::default(),
1645        };
1646
1647        let report = build_limit_trend_report(&input, Some(300), LimitReportOptions::default());
1648
1649        assert_eq!(report.changes.len(), 4);
1650        assert_eq!(
1651            report
1652                .changes
1653                .iter()
1654                .map(|change| change.used_percent)
1655                .collect::<Vec<_>>(),
1656            vec![0.0, 1.0, 2.0, 4.0]
1657        );
1658        assert!(report
1659            .changes
1660            .iter()
1661            .all(|change| change.kind != TREND_KIND_DECREASED));
1662        assert!(report
1663            .changes
1664            .iter()
1665            .all(|change| change.resets_at != expired_reset));
1666    }
1667
1668    #[test]
1669    fn trend_selected_window_omits_unchanged_sibling_points() {
1670        let five_hour_reset = utc_time(2026, 5, 18, 5, 0);
1671        let weekly_reset = utc_time(2026, 5, 25, 5, 0);
1672        let input = RateLimitSamplesReport {
1673            start: utc_time(2026, 5, 18, 0, 0),
1674            end: utc_time(2026, 5, 18, 1, 0),
1675            sessions_dir: "/tmp/sessions".to_string(),
1676            samples: vec![
1677                trend_window_sample("vector", 1, 0, 0.0, five_hour_reset, 300),
1678                trend_window_sample("vector", 1, 0, 10.0, weekly_reset, 10_080),
1679                trend_window_sample("vector", 2, 1, 1.0, five_hour_reset, 300),
1680                trend_window_sample("vector", 2, 1, 10.0, weekly_reset, 10_080),
1681                trend_window_sample("vector", 3, 2, 2.0, five_hour_reset, 300),
1682                trend_window_sample("vector", 3, 2, 11.0, weekly_reset, 10_080),
1683            ],
1684            diagnostics: RateLimitDiagnostics::default(),
1685        };
1686
1687        let report = build_limit_trend_report(&input, Some(10_080), LimitReportOptions::default());
1688
1689        assert_eq!(report.changes.len(), 2);
1690        assert!(report.changes.iter().all(|change| change.window == "7d"));
1691        assert_eq!(report.changes[0].kind, RESET_KIND_FIRST_OBSERVED);
1692        assert_eq!(report.changes[0].used_percent, 10.0);
1693        assert_eq!(report.changes[1].kind, TREND_KIND_INCREASED);
1694        assert_eq!(report.changes[1].used_percent, 11.0);
1695    }
1696
1697    #[test]
1698    fn builds_current_report_and_unobserved_status() {
1699        let input = fixture_samples_report();
1700
1701        let report = build_limit_current_report(
1702            &input,
1703            utc_time(2026, 5, 12, 13, 10),
1704            LimitReportOptions::default(),
1705        );
1706
1707        assert_eq!(report.status, "ok");
1708        assert_eq!(report.current.len(), 3);
1709        let current_weekly = report
1710            .current
1711            .iter()
1712            .find(|current| {
1713                current.window == "7d" && current.resets_at == Some(utc_time(2026, 5, 19, 9, 0))
1714            })
1715            .expect("current weekly");
1716        assert_eq!(current_weekly.status, CURRENT_STATUS_ACTIVE);
1717        assert_eq!(current_weekly.used_percent, Some(4.0));
1718        assert_eq!(current_weekly.remaining_percent, Some(96.0));
1719        assert_eq!(current_weekly.reset_in_seconds, Some(589_800));
1720
1721        let empty_input = RateLimitSamplesReport {
1722            start: utc_time(2026, 5, 1, 0, 0),
1723            end: utc_time(2026, 5, 1, 1, 0),
1724            sessions_dir: "/tmp/sessions".to_string(),
1725            samples: Vec::new(),
1726            diagnostics: RateLimitDiagnostics::default(),
1727        };
1728        let empty = build_limit_current_report(
1729            &empty_input,
1730            utc_time(2026, 5, 1, 1, 0),
1731            LimitReportOptions::default(),
1732        );
1733        assert_eq!(empty.status, "unobserved");
1734        assert!(empty.current.is_empty());
1735    }
1736
1737    #[test]
1738    fn current_report_shows_last_expired_window_when_no_cycle_is_active() {
1739        let input = fixture_samples_report();
1740
1741        let report = build_limit_current_report(
1742            &input,
1743            utc_time(2026, 5, 20, 0, 0),
1744            LimitReportOptions::default(),
1745        );
1746
1747        assert_eq!(report.status, CURRENT_STATUS_EXPIRED);
1748        assert_eq!(report.current.len(), 3);
1749        assert!(report
1750            .current
1751            .iter()
1752            .all(|window| window.status == CURRENT_STATUS_EXPIRED));
1753        let expired_weekly = report
1754            .current
1755            .iter()
1756            .find(|current| current.window == "7d")
1757            .expect("expired weekly");
1758        assert_eq!(expired_weekly.resets_at, Some(utc_time(2026, 5, 19, 9, 0)));
1759        assert_eq!(expired_weekly.last_seen, Some(utc_time(2026, 5, 12, 12, 0)));
1760        assert_eq!(expired_weekly.used_percent, Some(4.0));
1761        assert_eq!(expired_weekly.reset_in_seconds, None);
1762    }
1763
1764    #[test]
1765    fn samples_report_hides_source_by_default_and_exposes_it_for_verbose_diagnostics() {
1766        let input = fixture_samples_report();
1767
1768        let default_report = build_limit_samples_report(&input, LimitReportOptions::default());
1769        let default_value = serde_json::to_value(&default_report).expect("default json");
1770        assert_no_source_evidence(&default_value);
1771
1772        let verbose_report = build_limit_samples_report(
1773            &input,
1774            LimitReportOptions {
1775                include_diagnostics: true,
1776                include_source_evidence: true,
1777            },
1778        );
1779        let verbose_value = serde_json::to_value(&verbose_report).expect("verbose json");
1780        let evidence = verbose_value["diagnostics"]["sourceEvidence"]
1781            .as_array()
1782            .expect("source evidence");
1783        assert_eq!(evidence.len(), input.samples.len());
1784        assert!(evidence[0]["path"]
1785            .as_str()
1786            .expect("path")
1787            .contains("sessions"));
1788        assert!(evidence[0]["lineNumber"].as_u64().expect("line number") > 0);
1789    }
1790
1791    #[test]
1792    fn duplicate_samples_are_counted_without_changing_window_semantics() {
1793        let mut input = fixture_samples_report();
1794        input
1795            .samples
1796            .push(input.samples.first().expect("sample").clone());
1797
1798        let report = build_limit_windows_report(
1799            &input,
1800            LimitReportOptions {
1801                include_diagnostics: true,
1802                include_source_evidence: false,
1803            },
1804        );
1805
1806        assert_eq!(report.windows.len(), 9);
1807        assert_eq!(
1808            report
1809                .diagnostics
1810                .as_ref()
1811                .expect("diagnostics")
1812                .duplicate_samples,
1813            1
1814        );
1815    }
1816
1817    #[test]
1818    fn report_json_contains_expected_schema_keys() {
1819        let input = fixture_samples_report();
1820        let windows = serde_json::to_value(build_limit_windows_report(
1821            &input,
1822            LimitReportOptions::default(),
1823        ))
1824        .expect("windows json");
1825        assert_has_keys(
1826            &windows["windows"][0],
1827            &[
1828                "id",
1829                "accountId",
1830                "planType",
1831                "limitId",
1832                "window",
1833                "windowMinutes",
1834                "estimatedStart",
1835                "resetAt",
1836                "firstSeen",
1837                "lastSeen",
1838                "minUsedPercent",
1839                "maxUsedPercent",
1840                "lastUsedPercent",
1841                "sampleCount",
1842                "resetKind",
1843            ],
1844        );
1845
1846        let resets = serde_json::to_value(build_limit_resets_report(
1847            &input,
1848            false,
1849            LimitReportOptions::default(),
1850        ))
1851        .expect("resets json");
1852        assert_has_keys(
1853            &resets["resets"][0],
1854            &[
1855                "at",
1856                "accountId",
1857                "planType",
1858                "limitId",
1859                "window",
1860                "previousUsedPercent",
1861                "nextUsedPercent",
1862                "previousResetsAt",
1863                "nextResetsAt",
1864                "earlyBySeconds",
1865                "kind",
1866            ],
1867        );
1868
1869        let trend = serde_json::to_value(build_limit_trend_report(
1870            &input,
1871            None,
1872            LimitReportOptions::default(),
1873        ))
1874        .expect("trend json");
1875        assert_has_keys(
1876            &trend["changes"][0],
1877            &[
1878                "at",
1879                "window",
1880                "windowMinutes",
1881                "accountId",
1882                "planType",
1883                "limitId",
1884                "usedPercent",
1885                "remainingPercent",
1886                "deltaUsedPercent",
1887                "resetsAt",
1888                "kind",
1889            ],
1890        );
1891
1892        let current = serde_json::to_value(build_limit_current_report(
1893            &input,
1894            utc_time(2026, 5, 12, 13, 10),
1895            LimitReportOptions::default(),
1896        ))
1897        .expect("current json");
1898        assert_has_keys(
1899            &current["current"][0],
1900            &[
1901                "id",
1902                "status",
1903                "accountId",
1904                "planType",
1905                "limitId",
1906                "window",
1907                "windowMinutes",
1908                "lastSeen",
1909                "usedPercent",
1910                "remainingPercent",
1911                "resetsAt",
1912                "resetInSeconds",
1913            ],
1914        );
1915    }
1916
1917    fn fixture_samples_report() -> RateLimitSamplesReport {
1918        let codex_home =
1919            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/fixtures/rust-run/codex-home");
1920        read_rate_limit_samples_report(&RateLimitSamplesReadOptions {
1921            start: utc_time(2026, 5, 10, 0, 0),
1922            end: utc_time(2026, 5, 12, 14, 0),
1923            sessions_dir: codex_home.join("sessions"),
1924            scan_all_files: true,
1925            account_history_file: Some(codex_home.join("codex-ops/auth-account-history.json")),
1926            account_id: None,
1927            plan_type: None,
1928            window_minutes: None,
1929        })
1930        .expect("fixture samples")
1931    }
1932
1933    fn trend_samples_report() -> RateLimitSamplesReport {
1934        let first_reset = utc_time(2026, 5, 12, 17, 0);
1935        let next_reset = utc_time(2026, 5, 12, 18, 0);
1936        RateLimitSamplesReport {
1937            start: utc_time(2026, 5, 12, 12, 0),
1938            end: utc_time(2026, 5, 12, 13, 0),
1939            sessions_dir: "/tmp/sessions".to_string(),
1940            samples: vec![
1941                trend_sample(20.0, first_reset, 0),
1942                trend_sample(20.0, first_reset, 1),
1943                trend_sample(20.0, first_reset + Duration::seconds(1), 2),
1944                trend_sample(25.0, first_reset, 3),
1945                trend_sample(24.0, first_reset, 4),
1946                trend_sample(15.0, first_reset, 5),
1947                trend_sample(15.0, first_reset + Duration::seconds(1), 6),
1948                trend_sample(15.0, next_reset, 7),
1949            ],
1950            diagnostics: RateLimitDiagnostics::default(),
1951        }
1952    }
1953
1954    fn report_sample(
1955        limit_id: &str,
1956        timestamp: DateTime<Utc>,
1957        used_percent: f64,
1958        resets_at: DateTime<Utc>,
1959        window_minutes: i64,
1960    ) -> RateLimitSample {
1961        let window = match window_minutes {
1962            300 => "5h",
1963            10_080 => "7d",
1964            _ => "primary",
1965        };
1966        RateLimitSample {
1967            timestamp,
1968            session_id: format!("report-session-{}", timestamp.timestamp_millis()),
1969            account_id: Some("account-fixture".to_string()),
1970            plan_type: Some("pro".to_string()),
1971            limit_id: Some(limit_id.to_string()),
1972            window: window.to_string(),
1973            window_minutes,
1974            used_percent,
1975            remaining_percent: 100.0 - used_percent,
1976            resets_at,
1977            source: None,
1978        }
1979    }
1980
1981    fn reset_sample(
1982        limit_id: &str,
1983        minute_offset: i64,
1984        used_percent: f64,
1985        resets_at: DateTime<Utc>,
1986    ) -> RateLimitSample {
1987        let mut sample = unknown_limit_reset_sample(minute_offset, used_percent, resets_at);
1988        sample.limit_id = Some(limit_id.to_string());
1989        sample
1990    }
1991
1992    fn unknown_limit_reset_sample(
1993        minute_offset: i64,
1994        used_percent: f64,
1995        resets_at: DateTime<Utc>,
1996    ) -> RateLimitSample {
1997        RateLimitSample {
1998            timestamp: utc_time(2026, 5, 12, minute_offset as u32, 0),
1999            session_id: format!("reset-session-{minute_offset}"),
2000            account_id: Some("account-fixture".to_string()),
2001            plan_type: Some("pro".to_string()),
2002            limit_id: None,
2003            window: "7d".to_string(),
2004            window_minutes: 10_080,
2005            used_percent,
2006            remaining_percent: 100.0 - used_percent,
2007            resets_at,
2008            source: None,
2009        }
2010    }
2011
2012    fn trend_sample(
2013        used_percent: f64,
2014        resets_at: DateTime<Utc>,
2015        minute_offset: i64,
2016    ) -> RateLimitSample {
2017        RateLimitSample {
2018            timestamp: utc_time(2026, 5, 12, 12, minute_offset as u32),
2019            session_id: format!("trend-session-{minute_offset}"),
2020            account_id: Some("account-fixture".to_string()),
2021            plan_type: Some("pro".to_string()),
2022            limit_id: Some("fixture-trend-change".to_string()),
2023            window: "5h".to_string(),
2024            window_minutes: 300,
2025            used_percent,
2026            remaining_percent: 100.0 - used_percent,
2027            resets_at,
2028            source: None,
2029        }
2030    }
2031
2032    fn trend_window_sample(
2033        source: &str,
2034        source_line: usize,
2035        minute_offset: i64,
2036        used_percent: f64,
2037        resets_at: DateTime<Utc>,
2038        window_minutes: i64,
2039    ) -> RateLimitSample {
2040        let window = match window_minutes {
2041            300 => "5h",
2042            10_080 => "7d",
2043            _ => "primary",
2044        };
2045        RateLimitSample {
2046            timestamp: utc_time(2026, 5, 18, 0, minute_offset as u32),
2047            session_id: source.to_string(),
2048            account_id: Some("account-fixture".to_string()),
2049            plan_type: Some("pro".to_string()),
2050            limit_id: Some("fixture-trend-vector".to_string()),
2051            window: window.to_string(),
2052            window_minutes,
2053            used_percent,
2054            remaining_percent: 100.0 - used_percent,
2055            resets_at,
2056            source: Some(SourceSpan {
2057                path: format!("/tmp/{source}.jsonl"),
2058                line_number: source_line,
2059            }),
2060        }
2061    }
2062
2063    fn utc_time(year: i32, month: u32, day: u32, hour: u32, minute: u32) -> DateTime<Utc> {
2064        Utc.with_ymd_and_hms(year, month, day, hour, minute, 0)
2065            .single()
2066            .expect("valid UTC time")
2067    }
2068
2069    fn assert_has_keys(value: &Value, keys: &[&str]) {
2070        let object = value.as_object().expect("json object");
2071        for key in keys {
2072            assert!(object.contains_key(*key), "missing key {key}");
2073        }
2074    }
2075
2076    fn assert_no_source_evidence(value: &Value) {
2077        match value {
2078            Value::Object(object) => {
2079                for key in object.keys() {
2080                    assert!(
2081                        !matches!(
2082                            key.as_str(),
2083                            "source"
2084                                | "sourcePath"
2085                                | "sourceLine"
2086                                | "sourceEvidence"
2087                                | "filePath"
2088                                | "line"
2089                                | "lineNumber"
2090                        ),
2091                        "default report leaked source key {key}"
2092                    );
2093                }
2094                for child in object.values() {
2095                    assert_no_source_evidence(child);
2096                }
2097            }
2098            Value::Array(items) => {
2099                for item in items {
2100                    assert_no_source_evidence(item);
2101                }
2102            }
2103            _ => {}
2104        }
2105    }
2106}