Skip to main content

synapse_pingora/profiler/
profile_store.rs

1//! Thread-safe profile storage with LRU eviction.
2//!
3//! Provides concurrent access to endpoint profiles using DashMap.
4//! Includes dynamic path segment detection for template normalization.
5//!
6//! ## Memory Budget
7//! Default: 10,000 profiles * ~2KB = ~20MB
8
9use std::collections::HashSet;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use dashmap::DashMap;
14use serde::{Deserialize, Serialize};
15
16use crate::profiler::endpoint_profile::EndpointProfile;
17
18// ============================================================================
19// Configuration
20// ============================================================================
21
22/// Configuration for profile storage.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ProfileStoreConfig {
25    /// Maximum number of endpoint profiles to store.
26    pub max_profiles: usize,
27    /// Minimum samples before profile is used for anomaly detection.
28    pub min_samples_for_detection: u32,
29    /// Profile idle timeout (ms) - profiles not seen for this long are eviction candidates.
30    pub idle_timeout_ms: u64,
31    /// Enable dynamic path segment detection.
32    pub enable_segment_detection: bool,
33    /// Cardinality threshold for path segment to be considered dynamic.
34    pub dynamic_segment_threshold: usize,
35}
36
37impl Default for ProfileStoreConfig {
38    fn default() -> Self {
39        Self {
40            max_profiles: 10_000,
41            min_samples_for_detection: 100,
42            idle_timeout_ms: 24 * 60 * 60 * 1000, // 24 hours
43            enable_segment_detection: true,
44            dynamic_segment_threshold: 10,
45        }
46    }
47}
48
49// ============================================================================
50// SegmentCardinality - Dynamic path segment detection
51// ============================================================================
52
53/// Tracks unique values for each path segment position to detect dynamic segments.
54///
55/// Example: If position 2 in "/api/users/{id}" sees 100+ unique values,
56/// that segment is marked as dynamic (variable).
57#[derive(Debug, Default)]
58pub struct SegmentCardinality {
59    /// Position -> Set of unique values seen
60    /// Uses HashSet with capacity limit for memory protection
61    segments: DashMap<usize, HashSet<String>>,
62    /// Maximum unique values to track per position
63    max_values: usize,
64}
65
66impl SegmentCardinality {
67    /// Create a new cardinality tracker.
68    pub fn new(max_values: usize) -> Self {
69        Self {
70            segments: DashMap::new(),
71            max_values,
72        }
73    }
74
75    /// Record a path segment value at a position.
76    /// Returns true if the segment appears to be dynamic (high cardinality).
77    pub fn record(&self, position: usize, value: &str, threshold: usize) -> bool {
78        let mut entry = self.segments.entry(position).or_insert_with(HashSet::new);
79        let values = entry.value_mut();
80
81        // Don't track beyond max_values (memory protection)
82        if values.len() < self.max_values {
83            values.insert(value.to_string());
84        }
85
86        values.len() >= threshold
87    }
88
89    /// Check if a position appears dynamic (has high cardinality).
90    pub fn is_dynamic(&self, position: usize, threshold: usize) -> bool {
91        self.segments
92            .get(&position)
93            .map(|v| v.len() >= threshold)
94            .unwrap_or(false)
95    }
96
97    /// Get cardinality at a position.
98    pub fn cardinality(&self, position: usize) -> usize {
99        self.segments.get(&position).map(|v| v.len()).unwrap_or(0)
100    }
101
102    /// Clear all tracked data.
103    pub fn clear(&self) {
104        self.segments.clear();
105    }
106}
107
108// ============================================================================
109// ProfileStore - Thread-safe storage
110// ============================================================================
111
112/// Thread-safe storage for endpoint profiles.
113///
114/// Uses DashMap for lock-free concurrent access.
115pub struct ProfileStore {
116    /// Profiles by template path.
117    profiles: DashMap<String, EndpointProfile>,
118    /// Configuration.
119    config: ProfileStoreConfig,
120    /// Segment cardinality tracker for dynamic path detection.
121    segment_cardinality: SegmentCardinality,
122    /// Total profiles created (lifetime).
123    total_created: AtomicU64,
124    /// Total profiles evicted (lifetime).
125    total_evicted: AtomicU64,
126    /// Last eviction timestamp (ms).
127    last_eviction_ms: AtomicU64,
128}
129
130impl Default for ProfileStore {
131    fn default() -> Self {
132        Self::new(ProfileStoreConfig::default())
133    }
134}
135
136impl ProfileStore {
137    /// Create a new profile store with configuration.
138    pub fn new(config: ProfileStoreConfig) -> Self {
139        let max_segment_values = config.dynamic_segment_threshold * 2;
140        Self {
141            profiles: DashMap::with_capacity(config.max_profiles / 2),
142            config,
143            segment_cardinality: SegmentCardinality::new(max_segment_values),
144            total_created: AtomicU64::new(0),
145            total_evicted: AtomicU64::new(0),
146            last_eviction_ms: AtomicU64::new(0),
147        }
148    }
149
150    /// Get configuration.
151    pub fn config(&self) -> &ProfileStoreConfig {
152        &self.config
153    }
154
155    /// Get or create a profile for a path.
156    ///
157    /// Normalizes the path to a template if dynamic segment detection is enabled.
158    pub fn get_or_create(
159        &self,
160        path: &str,
161    ) -> dashmap::mapref::one::RefMut<'_, String, EndpointProfile> {
162        let template = if self.config.enable_segment_detection {
163            self.normalize_path(path)
164        } else {
165            path.to_string()
166        };
167
168        let now_ms = now_ms();
169
170        // Check capacity and evict if needed
171        self.maybe_evict(now_ms);
172
173        self.profiles.entry(template.clone()).or_insert_with(|| {
174            self.total_created.fetch_add(1, Ordering::Relaxed);
175            EndpointProfile::new(template, now_ms)
176        })
177    }
178
179    /// Get an existing profile (read-only).
180    pub fn get(
181        &self,
182        template: &str,
183    ) -> Option<dashmap::mapref::one::Ref<'_, String, EndpointProfile>> {
184        self.profiles.get(template)
185    }
186
187    /// Check if a profile exists.
188    pub fn contains(&self, template: &str) -> bool {
189        self.profiles.contains_key(template)
190    }
191
192    /// Get number of stored profiles.
193    pub fn len(&self) -> usize {
194        self.profiles.len()
195    }
196
197    /// Check if store is empty.
198    pub fn is_empty(&self) -> bool {
199        self.profiles.is_empty()
200    }
201
202    /// Normalize a path to a template by replacing dynamic segments.
203    ///
204    /// Example: "/api/users/12345/orders" -> "/api/users/{id}/orders"
205    fn normalize_path(&self, path: &str) -> String {
206        let segments: Vec<&str> = path.split('/').collect();
207        let threshold = self.config.dynamic_segment_threshold;
208
209        let normalized: Vec<String> = segments
210            .iter()
211            .enumerate()
212            .map(|(pos, segment)| {
213                if segment.is_empty() {
214                    return String::new();
215                }
216
217                // Check if this looks like an ID (numeric, UUID-like, etc.)
218                let looks_dynamic = Self::looks_like_id(segment);
219
220                // Record cardinality
221                let is_high_cardinality = self.segment_cardinality.record(pos, segment, threshold);
222
223                if looks_dynamic || is_high_cardinality {
224                    "{id}".to_string()
225                } else {
226                    segment.to_string()
227                }
228            })
229            .collect();
230
231        normalized.join("/")
232    }
233
234    /// Check if a segment looks like an ID (numeric, UUID, hex hash, etc.).
235    fn looks_like_id(segment: &str) -> bool {
236        // Empty segments are not IDs
237        if segment.is_empty() {
238            return false;
239        }
240
241        // Pure numeric (user IDs, etc.)
242        if segment.chars().all(|c| c.is_ascii_digit()) {
243            return !segment.is_empty() && segment.len() <= 20; // Reasonable ID length
244        }
245
246        // UUID format: 8-4-4-4-12 hex
247        if segment.len() == 36 && segment.chars().all(|c| c.is_ascii_hexdigit() || c == '-') {
248            return true;
249        }
250
251        // Hex string (16+ chars, common for hashes/tokens)
252        if segment.len() >= 16 && segment.chars().all(|c| c.is_ascii_hexdigit()) {
253            return true;
254        }
255
256        // MongoDB ObjectId (24 hex chars)
257        if segment.len() == 24 && segment.chars().all(|c| c.is_ascii_hexdigit()) {
258            return true;
259        }
260
261        false
262    }
263
264    /// Evict profiles if at capacity.
265    fn maybe_evict(&self, now_ms: u64) {
266        // Only check eviction periodically (every 1000ms)
267        let last = self.last_eviction_ms.load(Ordering::Relaxed);
268        if now_ms.saturating_sub(last) < 1000 {
269            return;
270        }
271
272        if self.profiles.len() < self.config.max_profiles {
273            return;
274        }
275
276        self.last_eviction_ms.store(now_ms, Ordering::Relaxed);
277        self.evict_stale(now_ms);
278    }
279
280    /// Evict stale profiles (not seen within idle timeout).
281    fn evict_stale(&self, now_ms: u64) {
282        let idle_timeout = self.config.idle_timeout_ms;
283        let cutoff = now_ms.saturating_sub(idle_timeout);
284
285        // Collect keys to remove (to avoid holding refs during iteration)
286        let stale_keys: Vec<String> = self
287            .profiles
288            .iter()
289            .filter(|entry| entry.value().last_updated_ms < cutoff)
290            .map(|entry| entry.key().clone())
291            .take(100) // Batch size
292            .collect();
293
294        for key in stale_keys {
295            if self.profiles.remove(&key).is_some() {
296                self.total_evicted.fetch_add(1, Ordering::Relaxed);
297            }
298        }
299    }
300
301    /// Clear all profiles.
302    pub fn clear(&self) {
303        self.profiles.clear();
304        self.segment_cardinality.clear();
305    }
306
307    /// Get store metrics.
308    pub fn metrics(&self) -> ProfileStoreMetrics {
309        ProfileStoreMetrics {
310            current_profiles: self.profiles.len(),
311            max_profiles: self.config.max_profiles,
312            total_created: self.total_created.load(Ordering::Relaxed),
313            total_evicted: self.total_evicted.load(Ordering::Relaxed),
314        }
315    }
316
317    /// List all profile templates.
318    pub fn list_templates(&self) -> Vec<String> {
319        self.profiles.iter().map(|e| e.key().clone()).collect()
320    }
321
322    /// Get all profiles as a snapshot.
323    ///
324    /// Returns a clone of all profiles currently in storage.
325    /// Useful for persistence and state export.
326    pub fn get_profiles(&self) -> Vec<EndpointProfile> {
327        self.profiles.iter().map(|e| e.value().clone()).collect()
328    }
329
330    /// Get mature profiles (those with enough samples for detection).
331    pub fn mature_profiles(&self) -> Vec<String> {
332        let min = self.config.min_samples_for_detection;
333        self.profiles
334            .iter()
335            .filter(|e| e.value().is_mature(min))
336            .map(|e| e.key().clone())
337            .collect()
338    }
339}
340
341/// Profile store metrics.
342#[derive(Debug, Clone, Serialize)]
343pub struct ProfileStoreMetrics {
344    pub current_profiles: usize,
345    pub max_profiles: usize,
346    pub total_created: u64,
347    pub total_evicted: u64,
348}
349
350/// Get current time in milliseconds.
351#[inline]
352fn now_ms() -> u64 {
353    SystemTime::now()
354        .duration_since(UNIX_EPOCH)
355        .map(|d| d.as_millis() as u64)
356        .unwrap_or(0)
357}
358
359// ============================================================================
360// Tests
361// ============================================================================
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    #[test]
368    fn test_segment_cardinality_basic() {
369        let sc = SegmentCardinality::new(100);
370
371        // Add values to position 0
372        for i in 0..5 {
373            sc.record(0, &format!("value_{}", i), 10);
374        }
375
376        assert_eq!(sc.cardinality(0), 5);
377        assert!(!sc.is_dynamic(0, 10));
378    }
379
380    #[test]
381    fn test_segment_cardinality_threshold() {
382        let sc = SegmentCardinality::new(100);
383
384        // Add 10 values
385        for i in 0..10 {
386            let is_dynamic = sc.record(0, &format!("value_{}", i), 10);
387            if i < 9 {
388                assert!(!is_dynamic);
389            } else {
390                assert!(is_dynamic);
391            }
392        }
393
394        assert!(sc.is_dynamic(0, 10));
395    }
396
397    #[test]
398    fn test_profile_store_basic() {
399        let store = ProfileStore::default();
400
401        {
402            let mut profile = store.get_or_create("/api/users");
403            profile.update(100, &[("name", "John")], Some("application/json"), now_ms());
404        }
405
406        assert_eq!(store.len(), 1);
407        assert!(store.contains("/api/users"));
408    }
409
410    #[test]
411    fn test_profile_store_path_normalization() {
412        let config = ProfileStoreConfig {
413            enable_segment_detection: true,
414            dynamic_segment_threshold: 2,
415            ..Default::default()
416        };
417        let store = ProfileStore::new(config);
418
419        // Access paths with numeric IDs
420        store.get_or_create("/api/users/123/orders");
421        store.get_or_create("/api/users/456/orders");
422
423        // Both should normalize to the same template
424        assert_eq!(store.len(), 1);
425
426        let templates = store.list_templates();
427        assert!(templates[0].contains("{id}"));
428    }
429
430    #[test]
431    fn test_looks_like_id() {
432        // Numeric IDs
433        assert!(ProfileStore::looks_like_id("123"));
434        assert!(ProfileStore::looks_like_id("12345678901234567890"));
435        assert!(!ProfileStore::looks_like_id("123456789012345678901")); // Too long
436
437        // UUIDs
438        assert!(ProfileStore::looks_like_id(
439            "550e8400-e29b-41d4-a716-446655440000"
440        ));
441
442        // Hex hashes
443        assert!(ProfileStore::looks_like_id("abcdef1234567890"));
444        assert!(!ProfileStore::looks_like_id("abcdef12345")); // Too short
445
446        // MongoDB ObjectId
447        assert!(ProfileStore::looks_like_id("507f1f77bcf86cd799439011"));
448
449        // Non-IDs
450        assert!(!ProfileStore::looks_like_id("users"));
451        assert!(!ProfileStore::looks_like_id("api"));
452        assert!(!ProfileStore::looks_like_id(""));
453    }
454
455    #[test]
456    fn test_profile_store_without_normalization() {
457        let config = ProfileStoreConfig {
458            enable_segment_detection: false,
459            ..Default::default()
460        };
461        let store = ProfileStore::new(config);
462
463        store.get_or_create("/api/users/123");
464        store.get_or_create("/api/users/456");
465
466        // Without normalization, these should be separate
467        assert_eq!(store.len(), 2);
468    }
469
470    #[test]
471    fn test_profile_store_metrics() {
472        let store = ProfileStore::default();
473
474        for i in 0..5 {
475            store.get_or_create(&format!("/api/endpoint_{}", i));
476        }
477
478        let metrics = store.metrics();
479        assert_eq!(metrics.current_profiles, 5);
480        assert_eq!(metrics.total_created, 5);
481        assert_eq!(metrics.total_evicted, 0);
482    }
483
484    #[test]
485    fn test_profile_store_clear() {
486        let store = ProfileStore::default();
487
488        for i in 0..5 {
489            store.get_or_create(&format!("/api/endpoint_{}", i));
490        }
491        assert_eq!(store.len(), 5);
492
493        store.clear();
494        assert!(store.is_empty());
495    }
496
497    #[test]
498    fn test_profile_store_mature_profiles() {
499        let config = ProfileStoreConfig {
500            min_samples_for_detection: 10,
501            enable_segment_detection: false,
502            ..Default::default()
503        };
504        let store = ProfileStore::new(config);
505
506        // Create one mature and one immature profile
507        {
508            let mut p1 = store.get_or_create("/api/mature");
509            for _ in 0..15 {
510                p1.update(100, &[], None, now_ms());
511            }
512        }
513        {
514            let mut p2 = store.get_or_create("/api/immature");
515            for _ in 0..5 {
516                p2.update(100, &[], None, now_ms());
517            }
518        }
519
520        let mature = store.mature_profiles();
521        assert_eq!(mature.len(), 1);
522        assert!(mature.contains(&"/api/mature".to_string()));
523    }
524
525    #[test]
526    fn test_segment_cardinality_clear() {
527        let sc = SegmentCardinality::new(100);
528
529        for i in 0..10 {
530            sc.record(0, &format!("value_{}", i), 20);
531        }
532        assert_eq!(sc.cardinality(0), 10);
533
534        sc.clear();
535        assert_eq!(sc.cardinality(0), 0);
536    }
537
538    #[test]
539    fn test_segment_cardinality_max_values() {
540        let sc = SegmentCardinality::new(5); // Max 5 values
541
542        // Try to add 10 values
543        for i in 0..10 {
544            sc.record(0, &format!("value_{}", i), 100);
545        }
546
547        // Should cap at 5
548        assert_eq!(sc.cardinality(0), 5);
549    }
550}