1use crate::filter::Filter;
2use crate::grouping::densest_window;
3use crate::model::{Capture, Entry};
4use crate::render::human_ms;
5use ahash::AHashMap;
6use serde::Serialize;
7
8#[derive(Debug, Serialize)]
9pub struct StormsResult {
10 pub storms: Vec<Storm>,
11}
12
13#[derive(Debug, Serialize)]
14pub struct Storm {
15 pub scope_kind: String, pub scope: String,
17 pub peak_count: usize,
18 pub window_ms: u64,
19 pub first_offset_ms: f64,
20 pub last_offset_ms: f64,
21 pub calls_per_sec: f64,
22 pub entry_ids: Vec<String>,
23}
24
25pub fn compute_storms(
27 cap: &Capture,
28 filter: &Filter,
29 window_ms: u64,
30 min_count: usize,
31 top: usize,
32) -> StormsResult {
33 let entries: Vec<&Entry> = cap.entries.iter().filter(|e| filter.matches(e)).collect();
34
35 let mut by_host: AHashMap<String, Vec<&Entry>> = AHashMap::new();
36 let mut by_endpoint: AHashMap<(String, String), Vec<&Entry>> = AHashMap::new();
37 for e in &entries {
38 by_host.entry(e.host.clone()).or_default().push(e);
39 by_endpoint
40 .entry((e.host.clone(), e.norm_path.clone()))
41 .or_default()
42 .push(e);
43 }
44
45 let mut storms = Vec::new();
46 for (host, mut g) in by_host {
47 sort_by_offset(&mut g);
48 if let Some(s) = storm_for("host", host, &g, window_ms, min_count) {
49 storms.push(s);
50 }
51 }
52 for ((host, np), mut g) in by_endpoint {
53 sort_by_offset(&mut g);
54 if let Some(s) = storm_for("endpoint", format!("{host}{np}"), &g, window_ms, min_count) {
55 storms.push(s);
56 }
57 }
58
59 storms.sort_by(|a, b| {
60 b.peak_count
61 .cmp(&a.peak_count)
62 .then(a.scope.cmp(&b.scope))
63 .then(a.scope_kind.cmp(&b.scope_kind))
64 });
65 storms.truncate(top);
66 StormsResult { storms }
67}
68
69fn sort_by_offset(g: &mut [&Entry]) {
70 g.sort_by(|a, b| {
71 a.started_offset_ms
72 .partial_cmp(&b.started_offset_ms)
73 .unwrap_or(std::cmp::Ordering::Equal)
74 .then(a.index.cmp(&b.index))
75 });
76}
77
78fn storm_for(
79 kind: &str,
80 scope: String,
81 g: &[&Entry],
82 window_ms: u64,
83 min_count: usize,
84) -> Option<Storm> {
85 let (count, l, r) = densest_window(g, window_ms as f64);
86 if count < min_count {
87 return None;
88 }
89 let win = &g[l..=r];
90 Some(Storm {
91 scope_kind: kind.to_string(),
92 scope,
93 peak_count: count,
94 window_ms,
95 first_offset_ms: win.first().unwrap().started_offset_ms,
96 last_offset_ms: win.last().unwrap().started_offset_ms,
97 calls_per_sec: count as f64 * 1000.0 / window_ms as f64,
98 entry_ids: win.iter().map(|e| e.id.clone()).collect(),
99 })
100}
101
102pub fn render_storms_text(r: &StormsResult) -> String {
104 let mut out = String::new();
105 out.push_str("== wiretrail storms ==\n");
106 for s in &r.storms {
107 out.push_str(&format!(
108 "\n{} {} {} calls in {} ({:.1}/s)\n",
109 s.scope_kind,
110 s.scope,
111 s.peak_count,
112 human_ms(s.window_ms as f64),
113 s.calls_per_sec
114 ));
115 out.push_str(&format!(
116 " window: {} - {}\n",
117 human_ms(s.first_offset_ms),
118 human_ms(s.last_offset_ms)
119 ));
120 out.push_str(&format!(" entries: {}\n", s.entry_ids.join(", ")));
121 }
122 out
123}
124
125#[cfg(test)]
126mod tests {
127 use super::compute_storms;
128 use crate::filter::Filter;
129 use crate::model::{Capture, Entry, sample_capture, sample_entry};
130
131 fn at(index: usize, host: &str, path: &str, offset_ms: f64) -> Entry {
132 let mut e = sample_entry(index, host, "GET", path, 200);
133 e.started_offset_ms = offset_ms;
134 e
135 }
136
137 fn burst() -> Capture {
138 let mut es = Vec::new();
140 for i in 0..6 {
141 es.push(at(i, "h", "/x", i as f64 * 50.0));
142 }
143 sample_capture(es)
144 }
145
146 #[test]
147 fn detects_endpoint_and_host_burst() {
148 let r = compute_storms(&burst(), &Filter::parse(&[]).unwrap(), 1000, 5, 10);
149 assert!(
150 r.storms
151 .iter()
152 .any(|s| s.scope_kind == "endpoint" && s.peak_count == 6)
153 );
154 assert!(
155 r.storms
156 .iter()
157 .any(|s| s.scope_kind == "host" && s.peak_count == 6)
158 );
159 }
160
161 #[test]
162 fn no_storm_when_spread_out() {
163 let mut es = Vec::new();
164 for i in 0..6 {
165 es.push(at(i, "h", "/x", i as f64 * 1000.0)); }
167 let r = compute_storms(
168 &sample_capture(es),
169 &Filter::parse(&[]).unwrap(),
170 500,
171 5,
172 10,
173 );
174 assert!(r.storms.is_empty());
175 }
176
177 #[test]
178 fn min_count_gates() {
179 let r = compute_storms(&burst(), &Filter::parse(&[]).unwrap(), 1000, 7, 10);
180 assert!(r.storms.is_empty()); }
182}