1use std::collections::VecDeque;
17use std::sync::{Arc, Mutex, OnceLock};
18
19static HASH_KEY: OnceLock<u64> = OnceLock::new();
24
25fn hash_key() -> u64 {
26 *HASH_KEY.get_or_init(|| {
27 std::time::SystemTime::now()
31 .duration_since(std::time::UNIX_EPOCH)
32 .map(|d| d.as_nanos() as u64)
33 .unwrap_or(0xcafe_babe_dead_beef)
34 ^ (std::process::id() as u64).wrapping_mul(0x517cc1b727220a95)
35 })
36}
37
38pub(crate) fn hash_label(label: &str) -> u64 {
40 hash_label_with_key(label, hash_key())
41}
42
43fn hash_label_with_key(label: &str, key: u64) -> u64 {
44 const FNV_PRIME: u64 = 0x0000_0100_0000_01B3;
45 let mut h = key ^ 0xcbf2_9ce4_8422_2325;
48 for byte in label.bytes() {
49 h ^= byte as u64;
50 h = h.wrapping_mul(FNV_PRIME);
51 }
52 h
53}
54
55#[derive(Debug, Clone)]
65pub struct SlowQueryEvent {
66 pub ts_ms: u64,
67 pub kind: &'static str,
68 pub duration_ms: u64,
69 pub sql_redacted: String,
71 pub tenant_hash: u64,
73 pub identity_hash: u64,
75}
76
77#[derive(Debug, Default, Clone)]
86pub struct SlowQueryFilter {
87 pub limit: Option<usize>,
89 pub since_ms: Option<u64>,
91 pub min_duration_ms: Option<u64>,
93 pub kind: Option<&'static str>,
95}
96
97pub const DEFAULT_CAP: usize = 10_000;
103
104const DEFAULT_READ_LIMIT: usize = 100;
106
107pub struct SlowQueryStore {
112 ring: Mutex<VecDeque<SlowQueryEvent>>,
113 cap: usize,
114}
115
116impl SlowQueryStore {
117 pub fn new(cap: usize) -> Arc<Self> {
118 Arc::new(Self {
119 ring: Mutex::new(VecDeque::with_capacity(cap.min(1024))),
120 cap,
121 })
122 }
123
124 pub fn push(&self, event: SlowQueryEvent) {
126 if let Ok(mut ring) = self.ring.lock() {
127 if ring.len() >= self.cap {
128 ring.pop_front();
129 }
130 ring.push_back(event);
131 }
132 }
133
134 pub fn read(&self, filter: &SlowQueryFilter) -> Vec<SlowQueryEvent> {
139 let limit = filter.limit.unwrap_or(DEFAULT_READ_LIMIT);
140 let Ok(ring) = self.ring.lock() else {
141 return vec![];
142 };
143
144 ring.iter()
145 .rev()
146 .filter(|e| {
147 if let Some(since) = filter.since_ms {
148 if e.ts_ms < since {
149 return false;
150 }
151 }
152 if let Some(min_dur) = filter.min_duration_ms {
153 if e.duration_ms < min_dur {
154 return false;
155 }
156 }
157 if let Some(kind) = filter.kind {
158 if e.kind != kind {
159 return false;
160 }
161 }
162 true
163 })
164 .take(limit)
165 .cloned()
166 .collect()
167 }
168
169 pub fn len(&self) -> usize {
171 self.ring.lock().map(|r| r.len()).unwrap_or(0)
172 }
173
174 pub fn is_empty(&self) -> bool {
175 self.len() == 0
176 }
177}
178
179#[cfg(test)]
184mod tests {
185 use super::*;
186
187 fn event(ts_ms: u64, kind: &'static str, duration_ms: u64) -> SlowQueryEvent {
188 SlowQueryEvent {
189 ts_ms,
190 kind,
191 duration_ms,
192 sql_redacted: format!("SELECT {ts_ms} FROM t"),
193 tenant_hash: hash_label_with_key("tenant_a", 0xdead_beef),
194 identity_hash: hash_label_with_key("user_1", 0xdead_beef),
195 }
196 }
197
198 fn filled(events: &[(u64, &'static str, u64)]) -> Arc<SlowQueryStore> {
199 let store = SlowQueryStore::new(DEFAULT_CAP);
200 for &(ts, kind, dur) in events {
201 store.push(event(ts, kind, dur));
202 }
203 store
204 }
205
206 #[test]
207 fn empty_store_returns_empty() {
208 let store = SlowQueryStore::new(DEFAULT_CAP);
209 assert!(store.read(&SlowQueryFilter::default()).is_empty());
210 }
211
212 #[test]
213 fn results_most_recent_first() {
214 let store = filled(&[
215 (1000, "select", 100),
216 (2000, "select", 100),
217 (3000, "select", 100),
218 ]);
219 let result = store.read(&SlowQueryFilter::default());
220 assert_eq!(result.len(), 3);
221 assert!(result[0].ts_ms >= result[1].ts_ms);
222 assert!(result[1].ts_ms >= result[2].ts_ms);
223 }
224
225 #[test]
226 fn limit_returns_n_most_recent() {
227 let store = filled(&[
228 (1000, "select", 100),
229 (2000, "select", 100),
230 (3000, "select", 100),
231 (4000, "select", 100),
232 (5000, "select", 100),
233 ]);
234 let result = store.read(&SlowQueryFilter {
235 limit: Some(2),
236 ..Default::default()
237 });
238 assert_eq!(result.len(), 2);
239 assert_eq!(result[0].ts_ms, 5000);
240 assert_eq!(result[1].ts_ms, 4000);
241 }
242
243 #[test]
244 fn since_ms_excludes_older_events() {
245 let store = filled(&[
246 (1000, "select", 100),
247 (2000, "select", 100),
248 (3000, "select", 100),
249 ]);
250 let result = store.read(&SlowQueryFilter {
251 since_ms: Some(2000),
252 ..Default::default()
253 });
254 assert_eq!(result.len(), 2);
255 for e in &result {
256 assert!(e.ts_ms >= 2000, "ts_ms {} below since_ms", e.ts_ms);
257 }
258 }
259
260 #[test]
261 fn min_duration_ms_excludes_fast_queries() {
262 let store = filled(&[
263 (1000, "select", 50),
264 (2000, "select", 200),
265 (3000, "select", 500),
266 ]);
267 let result = store.read(&SlowQueryFilter {
268 min_duration_ms: Some(200),
269 ..Default::default()
270 });
271 assert_eq!(result.len(), 2);
272 for e in &result {
273 assert!(e.duration_ms >= 200, "duration {} below min", e.duration_ms);
274 }
275 }
276
277 #[test]
278 fn kind_filter_returns_only_matching() {
279 let store = filled(&[
280 (1000, "select", 100),
281 (2000, "insert", 100),
282 (3000, "select", 100),
283 (4000, "delete", 100),
284 ]);
285 let result = store.read(&SlowQueryFilter {
286 kind: Some("select"),
287 ..Default::default()
288 });
289 assert_eq!(result.len(), 2);
290 for e in &result {
291 assert_eq!(e.kind, "select");
292 }
293 }
294
295 #[test]
296 fn combined_filters_are_conjunctive() {
297 let store = filled(&[
298 (1000, "select", 100),
299 (2000, "select", 300),
300 (3000, "insert", 300),
301 (4000, "select", 300),
302 ]);
303 let result = store.read(&SlowQueryFilter {
304 since_ms: Some(2000),
305 min_duration_ms: Some(300),
306 kind: Some("select"),
307 limit: Some(10),
308 });
309 assert_eq!(result.len(), 2);
310 for e in &result {
311 assert_eq!(e.kind, "select");
312 assert!(e.ts_ms >= 2000);
313 assert!(e.duration_ms >= 300);
314 }
315 }
316
317 #[test]
318 fn ring_evicts_oldest_on_overflow() {
319 let store = SlowQueryStore::new(3);
320 for i in 0..5u64 {
321 store.push(event(i * 1000, "select", 100));
322 }
323 assert_eq!(store.len(), 3);
324 let result = store.read(&SlowQueryFilter::default());
325 let tss: Vec<u64> = result.iter().map(|e| e.ts_ms).collect();
326 assert!(!tss.contains(&0), "ts=0 should have been evicted");
327 assert!(!tss.contains(&1000), "ts=1000 should have been evicted");
328 assert!(tss.contains(&4000), "ts=4000 must be present");
329 }
330
331 #[test]
332 fn default_limit_caps_read() {
333 let store = SlowQueryStore::new(DEFAULT_CAP);
334 for i in 0..(DEFAULT_READ_LIMIT + 50) as u64 {
335 store.push(event(i * 1000, "select", 100));
336 }
337 assert_eq!(
338 store.read(&SlowQueryFilter::default()).len(),
339 DEFAULT_READ_LIMIT
340 );
341 }
342
343 #[test]
344 fn hash_stable_same_key() {
345 assert_eq!(
346 hash_label_with_key("my-tenant", 0xdead_beef),
347 hash_label_with_key("my-tenant", 0xdead_beef),
348 );
349 }
350
351 #[test]
352 fn hash_different_inputs_differ() {
353 assert_ne!(
354 hash_label_with_key("tenant_a", 0xdead_beef),
355 hash_label_with_key("tenant_b", 0xdead_beef),
356 );
357 }
358
359 #[test]
360 fn hash_different_keys_differ() {
361 assert_ne!(
362 hash_label_with_key("tenant", 0x1111),
363 hash_label_with_key("tenant", 0x2222),
364 );
365 }
366}