synapse_pingora/profiler/
profile_store.rs1use std::collections::HashSet;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use dashmap::DashMap;
14use serde::{Deserialize, Serialize};
15
16use crate::profiler::endpoint_profile::EndpointProfile;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ProfileStoreConfig {
25 pub max_profiles: usize,
27 pub min_samples_for_detection: u32,
29 pub idle_timeout_ms: u64,
31 pub enable_segment_detection: bool,
33 pub dynamic_segment_threshold: usize,
35}
36
37impl Default for ProfileStoreConfig {
38 fn default() -> Self {
39 Self {
40 max_profiles: 10_000,
41 min_samples_for_detection: 100,
42 idle_timeout_ms: 24 * 60 * 60 * 1000, enable_segment_detection: true,
44 dynamic_segment_threshold: 10,
45 }
46 }
47}
48
49#[derive(Debug, Default)]
58pub struct SegmentCardinality {
59 segments: DashMap<usize, HashSet<String>>,
62 max_values: usize,
64}
65
66impl SegmentCardinality {
67 pub fn new(max_values: usize) -> Self {
69 Self {
70 segments: DashMap::new(),
71 max_values,
72 }
73 }
74
75 pub fn record(&self, position: usize, value: &str, threshold: usize) -> bool {
78 let mut entry = self.segments.entry(position).or_insert_with(HashSet::new);
79 let values = entry.value_mut();
80
81 if values.len() < self.max_values {
83 values.insert(value.to_string());
84 }
85
86 values.len() >= threshold
87 }
88
89 pub fn is_dynamic(&self, position: usize, threshold: usize) -> bool {
91 self.segments
92 .get(&position)
93 .map(|v| v.len() >= threshold)
94 .unwrap_or(false)
95 }
96
97 pub fn cardinality(&self, position: usize) -> usize {
99 self.segments.get(&position).map(|v| v.len()).unwrap_or(0)
100 }
101
102 pub fn clear(&self) {
104 self.segments.clear();
105 }
106}
107
108pub struct ProfileStore {
116 profiles: DashMap<String, EndpointProfile>,
118 config: ProfileStoreConfig,
120 segment_cardinality: SegmentCardinality,
122 total_created: AtomicU64,
124 total_evicted: AtomicU64,
126 last_eviction_ms: AtomicU64,
128}
129
130impl Default for ProfileStore {
131 fn default() -> Self {
132 Self::new(ProfileStoreConfig::default())
133 }
134}
135
136impl ProfileStore {
137 pub fn new(config: ProfileStoreConfig) -> Self {
139 let max_segment_values = config.dynamic_segment_threshold * 2;
140 Self {
141 profiles: DashMap::with_capacity(config.max_profiles / 2),
142 config,
143 segment_cardinality: SegmentCardinality::new(max_segment_values),
144 total_created: AtomicU64::new(0),
145 total_evicted: AtomicU64::new(0),
146 last_eviction_ms: AtomicU64::new(0),
147 }
148 }
149
150 pub fn config(&self) -> &ProfileStoreConfig {
152 &self.config
153 }
154
155 pub fn get_or_create(
159 &self,
160 path: &str,
161 ) -> dashmap::mapref::one::RefMut<'_, String, EndpointProfile> {
162 let template = if self.config.enable_segment_detection {
163 self.normalize_path(path)
164 } else {
165 path.to_string()
166 };
167
168 let now_ms = now_ms();
169
170 self.maybe_evict(now_ms);
172
173 self.profiles.entry(template.clone()).or_insert_with(|| {
174 self.total_created.fetch_add(1, Ordering::Relaxed);
175 EndpointProfile::new(template, now_ms)
176 })
177 }
178
179 pub fn get(
181 &self,
182 template: &str,
183 ) -> Option<dashmap::mapref::one::Ref<'_, String, EndpointProfile>> {
184 self.profiles.get(template)
185 }
186
187 pub fn contains(&self, template: &str) -> bool {
189 self.profiles.contains_key(template)
190 }
191
192 pub fn len(&self) -> usize {
194 self.profiles.len()
195 }
196
197 pub fn is_empty(&self) -> bool {
199 self.profiles.is_empty()
200 }
201
202 fn normalize_path(&self, path: &str) -> String {
206 let segments: Vec<&str> = path.split('/').collect();
207 let threshold = self.config.dynamic_segment_threshold;
208
209 let normalized: Vec<String> = segments
210 .iter()
211 .enumerate()
212 .map(|(pos, segment)| {
213 if segment.is_empty() {
214 return String::new();
215 }
216
217 let looks_dynamic = Self::looks_like_id(segment);
219
220 let is_high_cardinality = self.segment_cardinality.record(pos, segment, threshold);
222
223 if looks_dynamic || is_high_cardinality {
224 "{id}".to_string()
225 } else {
226 segment.to_string()
227 }
228 })
229 .collect();
230
231 normalized.join("/")
232 }
233
234 fn looks_like_id(segment: &str) -> bool {
236 if segment.is_empty() {
238 return false;
239 }
240
241 if segment.chars().all(|c| c.is_ascii_digit()) {
243 return !segment.is_empty() && segment.len() <= 20; }
245
246 if segment.len() == 36 && segment.chars().all(|c| c.is_ascii_hexdigit() || c == '-') {
248 return true;
249 }
250
251 if segment.len() >= 16 && segment.chars().all(|c| c.is_ascii_hexdigit()) {
253 return true;
254 }
255
256 if segment.len() == 24 && segment.chars().all(|c| c.is_ascii_hexdigit()) {
258 return true;
259 }
260
261 false
262 }
263
264 fn maybe_evict(&self, now_ms: u64) {
266 let last = self.last_eviction_ms.load(Ordering::Relaxed);
268 if now_ms.saturating_sub(last) < 1000 {
269 return;
270 }
271
272 if self.profiles.len() < self.config.max_profiles {
273 return;
274 }
275
276 self.last_eviction_ms.store(now_ms, Ordering::Relaxed);
277 self.evict_stale(now_ms);
278 }
279
280 fn evict_stale(&self, now_ms: u64) {
282 let idle_timeout = self.config.idle_timeout_ms;
283 let cutoff = now_ms.saturating_sub(idle_timeout);
284
285 let stale_keys: Vec<String> = self
287 .profiles
288 .iter()
289 .filter(|entry| entry.value().last_updated_ms < cutoff)
290 .map(|entry| entry.key().clone())
291 .take(100) .collect();
293
294 for key in stale_keys {
295 if self.profiles.remove(&key).is_some() {
296 self.total_evicted.fetch_add(1, Ordering::Relaxed);
297 }
298 }
299 }
300
301 pub fn clear(&self) {
303 self.profiles.clear();
304 self.segment_cardinality.clear();
305 }
306
307 pub fn metrics(&self) -> ProfileStoreMetrics {
309 ProfileStoreMetrics {
310 current_profiles: self.profiles.len(),
311 max_profiles: self.config.max_profiles,
312 total_created: self.total_created.load(Ordering::Relaxed),
313 total_evicted: self.total_evicted.load(Ordering::Relaxed),
314 }
315 }
316
317 pub fn list_templates(&self) -> Vec<String> {
319 self.profiles.iter().map(|e| e.key().clone()).collect()
320 }
321
322 pub fn get_profiles(&self) -> Vec<EndpointProfile> {
327 self.profiles.iter().map(|e| e.value().clone()).collect()
328 }
329
330 pub fn mature_profiles(&self) -> Vec<String> {
332 let min = self.config.min_samples_for_detection;
333 self.profiles
334 .iter()
335 .filter(|e| e.value().is_mature(min))
336 .map(|e| e.key().clone())
337 .collect()
338 }
339}
340
341#[derive(Debug, Clone, Serialize)]
343pub struct ProfileStoreMetrics {
344 pub current_profiles: usize,
345 pub max_profiles: usize,
346 pub total_created: u64,
347 pub total_evicted: u64,
348}
349
350#[inline]
352fn now_ms() -> u64 {
353 SystemTime::now()
354 .duration_since(UNIX_EPOCH)
355 .map(|d| d.as_millis() as u64)
356 .unwrap_or(0)
357}
358
359#[cfg(test)]
364mod tests {
365 use super::*;
366
367 #[test]
368 fn test_segment_cardinality_basic() {
369 let sc = SegmentCardinality::new(100);
370
371 for i in 0..5 {
373 sc.record(0, &format!("value_{}", i), 10);
374 }
375
376 assert_eq!(sc.cardinality(0), 5);
377 assert!(!sc.is_dynamic(0, 10));
378 }
379
380 #[test]
381 fn test_segment_cardinality_threshold() {
382 let sc = SegmentCardinality::new(100);
383
384 for i in 0..10 {
386 let is_dynamic = sc.record(0, &format!("value_{}", i), 10);
387 if i < 9 {
388 assert!(!is_dynamic);
389 } else {
390 assert!(is_dynamic);
391 }
392 }
393
394 assert!(sc.is_dynamic(0, 10));
395 }
396
397 #[test]
398 fn test_profile_store_basic() {
399 let store = ProfileStore::default();
400
401 {
402 let mut profile = store.get_or_create("/api/users");
403 profile.update(100, &[("name", "John")], Some("application/json"), now_ms());
404 }
405
406 assert_eq!(store.len(), 1);
407 assert!(store.contains("/api/users"));
408 }
409
410 #[test]
411 fn test_profile_store_path_normalization() {
412 let config = ProfileStoreConfig {
413 enable_segment_detection: true,
414 dynamic_segment_threshold: 2,
415 ..Default::default()
416 };
417 let store = ProfileStore::new(config);
418
419 store.get_or_create("/api/users/123/orders");
421 store.get_or_create("/api/users/456/orders");
422
423 assert_eq!(store.len(), 1);
425
426 let templates = store.list_templates();
427 assert!(templates[0].contains("{id}"));
428 }
429
430 #[test]
431 fn test_looks_like_id() {
432 assert!(ProfileStore::looks_like_id("123"));
434 assert!(ProfileStore::looks_like_id("12345678901234567890"));
435 assert!(!ProfileStore::looks_like_id("123456789012345678901")); assert!(ProfileStore::looks_like_id(
439 "550e8400-e29b-41d4-a716-446655440000"
440 ));
441
442 assert!(ProfileStore::looks_like_id("abcdef1234567890"));
444 assert!(!ProfileStore::looks_like_id("abcdef12345")); assert!(ProfileStore::looks_like_id("507f1f77bcf86cd799439011"));
448
449 assert!(!ProfileStore::looks_like_id("users"));
451 assert!(!ProfileStore::looks_like_id("api"));
452 assert!(!ProfileStore::looks_like_id(""));
453 }
454
455 #[test]
456 fn test_profile_store_without_normalization() {
457 let config = ProfileStoreConfig {
458 enable_segment_detection: false,
459 ..Default::default()
460 };
461 let store = ProfileStore::new(config);
462
463 store.get_or_create("/api/users/123");
464 store.get_or_create("/api/users/456");
465
466 assert_eq!(store.len(), 2);
468 }
469
470 #[test]
471 fn test_profile_store_metrics() {
472 let store = ProfileStore::default();
473
474 for i in 0..5 {
475 store.get_or_create(&format!("/api/endpoint_{}", i));
476 }
477
478 let metrics = store.metrics();
479 assert_eq!(metrics.current_profiles, 5);
480 assert_eq!(metrics.total_created, 5);
481 assert_eq!(metrics.total_evicted, 0);
482 }
483
484 #[test]
485 fn test_profile_store_clear() {
486 let store = ProfileStore::default();
487
488 for i in 0..5 {
489 store.get_or_create(&format!("/api/endpoint_{}", i));
490 }
491 assert_eq!(store.len(), 5);
492
493 store.clear();
494 assert!(store.is_empty());
495 }
496
497 #[test]
498 fn test_profile_store_mature_profiles() {
499 let config = ProfileStoreConfig {
500 min_samples_for_detection: 10,
501 enable_segment_detection: false,
502 ..Default::default()
503 };
504 let store = ProfileStore::new(config);
505
506 {
508 let mut p1 = store.get_or_create("/api/mature");
509 for _ in 0..15 {
510 p1.update(100, &[], None, now_ms());
511 }
512 }
513 {
514 let mut p2 = store.get_or_create("/api/immature");
515 for _ in 0..5 {
516 p2.update(100, &[], None, now_ms());
517 }
518 }
519
520 let mature = store.mature_profiles();
521 assert_eq!(mature.len(), 1);
522 assert!(mature.contains(&"/api/mature".to_string()));
523 }
524
525 #[test]
526 fn test_segment_cardinality_clear() {
527 let sc = SegmentCardinality::new(100);
528
529 for i in 0..10 {
530 sc.record(0, &format!("value_{}", i), 20);
531 }
532 assert_eq!(sc.cardinality(0), 10);
533
534 sc.clear();
535 assert_eq!(sc.cardinality(0), 0);
536 }
537
538 #[test]
539 fn test_segment_cardinality_max_values() {
540 let sc = SegmentCardinality::new(5); for i in 0..10 {
544 sc.record(0, &format!("value_{}", i), 100);
545 }
546
547 assert_eq!(sc.cardinality(0), 5);
549 }
550}