har/analysis/
rate_limit.rs1use crate::filter::Filter;
2use crate::model::{Capture, Entry};
3use ahash::AHashMap;
4use serde::Serialize;
5use std::collections::BTreeMap;
6
7#[derive(Debug, Serialize)]
8pub struct RateLimitResult {
9 pub groups: Vec<RateLimitGroup>,
10}
11
12#[derive(Debug, Serialize)]
13pub struct RateLimitGroup {
14 pub host: String,
15 pub norm_path: String,
16 pub count_429: usize,
17 pub retry_after_secs: Vec<f64>,
18 pub ratelimit_headers: BTreeMap<String, String>,
19 pub cooldown_violated: bool,
20 pub violating_ids: Vec<String>,
21 pub entry_ids: Vec<String>,
22}
23
24fn header<'a>(e: &'a Entry, name: &str) -> Option<&'a str> {
25 e.resp_headers
26 .iter()
27 .find(|(n, _)| n.eq_ignore_ascii_case(name))
28 .map(|(_, v)| v.as_str())
29}
30
31fn is_limited(e: &Entry) -> bool {
32 e.status == 429 || header(e, "x-ratelimit-remaining") == Some("0")
33}
34
35pub fn compute_rate_limit(cap: &Capture, filter: &Filter, top: usize) -> RateLimitResult {
37 let entries: Vec<&Entry> = cap.entries.iter().filter(|e| filter.matches(e)).collect();
38
39 let mut by_route: AHashMap<(String, String), Vec<&Entry>> = AHashMap::new();
41 for e in &entries {
42 by_route
43 .entry((e.host.clone(), e.norm_path.clone()))
44 .or_default()
45 .push(e);
46 }
47
48 let mut groups: Vec<RateLimitGroup> = Vec::new();
49 for ((host, np), members) in &by_route {
50 let limited: Vec<&&Entry> = members.iter().filter(|e| is_limited(e)).collect();
51 if limited.is_empty() {
52 continue;
53 }
54
55 let count_429 = limited.iter().filter(|e| e.status == 429).count();
56 let mut retry_after_secs: Vec<f64> = Vec::new();
57 let mut ratelimit_headers: BTreeMap<String, String> = BTreeMap::new();
58 let mut violating_ids: Vec<String> = Vec::new();
59
60 for lim in &limited {
61 if let Some(ra) = header(lim, "retry-after").and_then(|v| v.trim().parse::<f64>().ok())
62 {
63 retry_after_secs.push(ra);
64 let cooldown_end = lim.started_offset_ms + ra * 1000.0;
65 for e in members.iter() {
66 if e.started_offset_ms > lim.started_offset_ms
67 && e.started_offset_ms < cooldown_end
68 && !violating_ids.contains(&e.id)
69 {
70 violating_ids.push(e.id.clone());
71 }
72 }
73 }
74 for (n, v) in &lim.resp_headers {
75 let ln = n.to_ascii_lowercase();
76 if ln.starts_with("x-ratelimit") {
77 ratelimit_headers.entry(ln).or_insert_with(|| v.clone());
78 }
79 }
80 }
81 violating_ids.sort();
82
83 groups.push(RateLimitGroup {
84 host: host.clone(),
85 norm_path: np.clone(),
86 count_429,
87 retry_after_secs,
88 ratelimit_headers,
89 cooldown_violated: !violating_ids.is_empty(),
90 violating_ids,
91 entry_ids: limited.iter().map(|e| e.id.clone()).collect(),
92 });
93 }
94
95 groups.sort_by(|a, b| {
96 b.count_429
97 .cmp(&a.count_429)
98 .then(a.host.cmp(&b.host))
99 .then(a.norm_path.cmp(&b.norm_path))
100 });
101 groups.truncate(top);
102 RateLimitResult { groups }
103}
104
105pub fn render_rate_limit_text(r: &RateLimitResult) -> String {
107 let mut out = String::new();
108 out.push_str("== wiretrail rate-limit ==\n");
109 for g in &r.groups {
110 let tag = if g.cooldown_violated {
111 " [cooldown violated]"
112 } else {
113 ""
114 };
115 out.push_str(&format!(
116 "\n{} {} ({}x 429){}\n",
117 g.host, g.norm_path, g.count_429, tag
118 ));
119 if !g.retry_after_secs.is_empty() {
120 let ras: Vec<String> = g.retry_after_secs.iter().map(|s| format!("{s}s")).collect();
121 out.push_str(&format!(" retry-after: {}\n", ras.join(", ")));
122 }
123 for (k, v) in &g.ratelimit_headers {
124 out.push_str(&format!(" {k}: {v}\n"));
125 }
126 if !g.violating_ids.is_empty() {
127 out.push_str(&format!(
128 " called during cooldown: {}\n",
129 g.violating_ids.join(", ")
130 ));
131 }
132 }
133 out
134}
135
136#[cfg(test)]
137mod tests {
138 use super::compute_rate_limit;
139 use crate::filter::Filter;
140 use crate::model::{Entry, sample_capture, sample_entry};
141
142 fn limited(index: usize, offset_ms: f64, retry_after: &str) -> Entry {
143 let mut e = sample_entry(index, "api.x", "GET", "/data", 429);
144 e.started_offset_ms = offset_ms;
145 e.resp_headers = vec![
146 ("Retry-After".to_string(), retry_after.to_string()),
147 ("X-RateLimit-Remaining".to_string(), "0".to_string()),
148 ];
149 e
150 }
151
152 fn ok(index: usize, offset_ms: f64) -> Entry {
153 let mut e = sample_entry(index, "api.x", "GET", "/data", 200);
154 e.started_offset_ms = offset_ms;
155 e
156 }
157
158 #[test]
159 fn groups_429_and_parses_retry_after() {
160 let cap = sample_capture(vec![limited(0, 0.0, "30")]);
161 let r = compute_rate_limit(&cap, &Filter::parse(&[]).unwrap(), 10);
162 assert_eq!(r.groups.len(), 1);
163 let g = &r.groups[0];
164 assert_eq!(g.count_429, 1);
165 assert_eq!(g.retry_after_secs, vec![30.0]);
166 assert_eq!(
167 g.ratelimit_headers
168 .get("x-ratelimit-remaining")
169 .map(String::as_str),
170 Some("0")
171 );
172 }
173
174 #[test]
175 fn flags_cooldown_violation() {
176 let cap = sample_capture(vec![limited(0, 0.0, "10"), ok(1, 2000.0)]);
178 let r = compute_rate_limit(&cap, &Filter::parse(&[]).unwrap(), 10);
179 assert!(r.groups[0].cooldown_violated);
180 assert_eq!(r.groups[0].violating_ids, vec!["e000001"]);
181 }
182
183 #[test]
184 fn respected_cooldown_not_flagged() {
185 let cap = sample_capture(vec![limited(0, 0.0, "10"), ok(1, 20000.0)]);
187 let r = compute_rate_limit(&cap, &Filter::parse(&[]).unwrap(), 10);
188 assert!(!r.groups[0].cooldown_violated);
189 }
190}