Skip to main content

har/analysis/
storms.rs

1use crate::filter::Filter;
2use crate::grouping::densest_window;
3use crate::model::{Capture, Entry};
4use crate::render::human_ms;
5use ahash::AHashMap;
6use serde::Serialize;
7
8#[derive(Debug, Serialize)]
9pub struct StormsResult {
10    pub storms: Vec<Storm>,
11}
12
13#[derive(Debug, Serialize)]
14pub struct Storm {
15    pub scope_kind: String, // "host" | "endpoint"
16    pub scope: String,
17    pub peak_count: usize,
18    pub window_ms: u64,
19    pub first_offset_ms: f64,
20    pub last_offset_ms: f64,
21    pub calls_per_sec: f64,
22    pub entry_ids: Vec<String>,
23}
24
25/// Detect bursts of many calls to the same host or endpoint within `window_ms`.
26pub fn compute_storms(
27    cap: &Capture,
28    filter: &Filter,
29    window_ms: u64,
30    min_count: usize,
31    top: usize,
32) -> StormsResult {
33    let entries: Vec<&Entry> = cap.entries.iter().filter(|e| filter.matches(e)).collect();
34
35    let mut by_host: AHashMap<String, Vec<&Entry>> = AHashMap::new();
36    let mut by_endpoint: AHashMap<(String, String), Vec<&Entry>> = AHashMap::new();
37    for e in &entries {
38        by_host.entry(e.host.clone()).or_default().push(e);
39        by_endpoint
40            .entry((e.host.clone(), e.norm_path.clone()))
41            .or_default()
42            .push(e);
43    }
44
45    let mut storms = Vec::new();
46    for (host, mut g) in by_host {
47        sort_by_offset(&mut g);
48        if let Some(s) = storm_for("host", host, &g, window_ms, min_count) {
49            storms.push(s);
50        }
51    }
52    for ((host, np), mut g) in by_endpoint {
53        sort_by_offset(&mut g);
54        if let Some(s) = storm_for("endpoint", format!("{host}{np}"), &g, window_ms, min_count) {
55            storms.push(s);
56        }
57    }
58
59    storms.sort_by(|a, b| {
60        b.peak_count
61            .cmp(&a.peak_count)
62            .then(a.scope.cmp(&b.scope))
63            .then(a.scope_kind.cmp(&b.scope_kind))
64    });
65    storms.truncate(top);
66    StormsResult { storms }
67}
68
69fn sort_by_offset(g: &mut [&Entry]) {
70    g.sort_by(|a, b| {
71        a.started_offset_ms
72            .partial_cmp(&b.started_offset_ms)
73            .unwrap_or(std::cmp::Ordering::Equal)
74            .then(a.index.cmp(&b.index))
75    });
76}
77
78fn storm_for(
79    kind: &str,
80    scope: String,
81    g: &[&Entry],
82    window_ms: u64,
83    min_count: usize,
84) -> Option<Storm> {
85    let (count, l, r) = densest_window(g, window_ms as f64);
86    if count < min_count {
87        return None;
88    }
89    let win = &g[l..=r];
90    Some(Storm {
91        scope_kind: kind.to_string(),
92        scope,
93        peak_count: count,
94        window_ms,
95        first_offset_ms: win.first().unwrap().started_offset_ms,
96        last_offset_ms: win.last().unwrap().started_offset_ms,
97        calls_per_sec: count as f64 * 1000.0 / window_ms as f64,
98        entry_ids: win.iter().map(|e| e.id.clone()).collect(),
99    })
100}
101
102/// Render storms as deterministic terminal text.
103pub fn render_storms_text(r: &StormsResult) -> String {
104    let mut out = String::new();
105    out.push_str("== wiretrail storms ==\n");
106    for s in &r.storms {
107        out.push_str(&format!(
108            "\n{} {}  {} calls in {} ({:.1}/s)\n",
109            s.scope_kind,
110            s.scope,
111            s.peak_count,
112            human_ms(s.window_ms as f64),
113            s.calls_per_sec
114        ));
115        out.push_str(&format!(
116            "  window: {} - {}\n",
117            human_ms(s.first_offset_ms),
118            human_ms(s.last_offset_ms)
119        ));
120        out.push_str(&format!("  entries: {}\n", s.entry_ids.join(", ")));
121    }
122    out
123}
124
125#[cfg(test)]
126mod tests {
127    use super::compute_storms;
128    use crate::filter::Filter;
129    use crate::model::{Capture, Entry, sample_capture, sample_entry};
130
131    fn at(index: usize, host: &str, path: &str, offset_ms: f64) -> Entry {
132        let mut e = sample_entry(index, host, "GET", path, 200);
133        e.started_offset_ms = offset_ms;
134        e
135    }
136
137    fn burst() -> Capture {
138        // 6 calls to same endpoint within 250ms
139        let mut es = Vec::new();
140        for i in 0..6 {
141            es.push(at(i, "h", "/x", i as f64 * 50.0));
142        }
143        sample_capture(es)
144    }
145
146    #[test]
147    fn detects_endpoint_and_host_burst() {
148        let r = compute_storms(&burst(), &Filter::parse(&[]).unwrap(), 1000, 5, 10);
149        assert!(
150            r.storms
151                .iter()
152                .any(|s| s.scope_kind == "endpoint" && s.peak_count == 6)
153        );
154        assert!(
155            r.storms
156                .iter()
157                .any(|s| s.scope_kind == "host" && s.peak_count == 6)
158        );
159    }
160
161    #[test]
162    fn no_storm_when_spread_out() {
163        let mut es = Vec::new();
164        for i in 0..6 {
165            es.push(at(i, "h", "/x", i as f64 * 1000.0)); // 1s apart
166        }
167        let r = compute_storms(
168            &sample_capture(es),
169            &Filter::parse(&[]).unwrap(),
170            500,
171            5,
172            10,
173        );
174        assert!(r.storms.is_empty());
175    }
176
177    #[test]
178    fn min_count_gates() {
179        let r = compute_storms(&burst(), &Filter::parse(&[]).unwrap(), 1000, 7, 10);
180        assert!(r.storms.is_empty()); // only 6 calls, need 7
181    }
182}