Skip to main content

synapse_pingora/profiler/
mod.rs

1//! API Endpoint Profiler Module
2//!
3//! This module provides behavioral profiling for API endpoints, including:
4//! - Statistical baseline learning (payload sizes, parameters, content types)
5//! - Anomaly detection based on deviations from baseline
6//! - Schema learning and validation (JSON body schema inference)
7//! - Rate tracking and burst detection
8//! - Pattern detection (UUID, email, JWT, etc.)
9//!
10//! ## Architecture
11//!
12//! ### Core Components
13//! - `EndpointProfile` - Per-endpoint statistical baseline
14//! - `Distribution` - Statistical distribution tracker (Welford's algorithm)
15//! - `PercentilesTracker` - P-square streaming percentiles
16//! - `RateTracker` - Time-windowed request rate tracking
17//! - `AnomalySignal` - Individual anomaly signal with severity
18//! - `AnomalyResult` - Aggregated detection results for a request
19//!
20//! ### Parameter Schema (local)
21//! - `ParameterSchema` - Expected parameters, content types, and payload sizes
22//!
23//! ### JSON Schema Learning (Ported from libsynapse)
24//! - `SchemaLearner` - Thread-safe JSON schema learning engine
25//! - `FieldSchema` - Per-field type and constraint tracking
26//! - `JsonEndpointSchema` - Full schema for request/response JSON bodies
27//! - `ValidationResult` - Schema violation detection
28//!
29//! **Note**: Schema learning only processes JSON object bodies. Array-root bodies
30//! (e.g., `[{...}, {...}]`) are silently skipped. APIs using arrays as the root
31//! element will not benefit from schema learning or validation.
32//!
33//! ### Storage
34//! - `ProfileStore` - Thread-safe storage with LRU eviction
35//! - `SegmentCardinality` - Dynamic path segment detection
36//!
37//! ## Memory Budget
38//!
39//! - Distribution: ~130 bytes
40//! - RateTracker: ~520 bytes
41//! - EndpointProfile: ~2KB
42//! - SchemaLearner: ~5KB per endpoint
43//! - ProfileStore: 10,000 profiles * 2KB = ~20MB default
44
45// Core modules
46mod distribution;
47mod endpoint_profile;
48mod rate_tracker;
49mod signals;
50
51// Header profiling modules (W4.1 HeaderProfiler)
52pub mod entropy;
53pub mod header_profiler;
54pub mod header_types;
55
56// Schema learning modules (ported from libsynapse)
57pub mod patterns;
58pub mod profile_store;
59pub mod schema_learner;
60pub mod schema_types;
61
62// Template path interning for allocation reduction
63pub mod template_intern;
64
65#[cfg(test)]
66mod value_analysis_tests;
67
68// Core re-exports
69pub use distribution::{Distribution, PercentilesTracker};
70pub use endpoint_profile::{is_likely_pii, redact_value, EndpointProfile, ParamStats};
71pub use rate_tracker::RateTracker;
72pub use signals::{AnomalyResult, AnomalySignal, AnomalySignalType};
73
74// Header profiler re-exports
75pub use entropy::{entropy_z_score, is_entropy_anomaly, normalized_entropy, shannon_entropy};
76pub use header_profiler::{HeaderProfiler, HeaderProfilerStats};
77pub use header_types::{HeaderAnomaly, HeaderAnomalyResult, HeaderBaseline, ValueStats};
78
79// Schema learning re-exports
80pub use patterns::{detect_pattern, matches_pattern};
81pub use profile_store::{
82    ProfileStore, ProfileStoreConfig, ProfileStoreMetrics, SegmentCardinality,
83};
84pub use schema_learner::{SchemaLearner, SchemaLearnerConfig, SchemaLearnerStats};
85pub use schema_types::{
86    EndpointSchema as JsonEndpointSchema, FieldSchema, FieldType, PatternType, SchemaViolation,
87    ValidationResult, ViolationSeverity, ViolationType,
88};
89
90// Template interning re-exports
91pub use template_intern::{
92    cache_stats as template_cache_stats, intern_template, normalize_and_intern,
93};
94
95use dashmap::DashMap;
96use std::collections::HashMap;
97
98use crate::config::ProfilerConfig;
99
100/// Profiler manager for endpoint behavior learning and anomaly detection.
101#[derive(Debug)]
102pub struct Profiler {
103    /// Configuration
104    config: ProfilerConfig,
105    /// Endpoint profiles (template -> profile)
106    profiles: DashMap<String, EndpointProfile>,
107    /// Learned schemas (template -> schema definition)
108    schemas: DashMap<String, ParameterSchema>,
109}
110
111/// Learned parameter schema for an endpoint.
112///
113/// This tracks expected parameters, content types, and payload sizes for an endpoint.
114/// For JSON body schema learning (field types, constraints), see `JsonEndpointSchema`.
115#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
116pub struct ParameterSchema {
117    /// Path template
118    pub template: String,
119    /// Expected content types
120    pub expected_content_types: Vec<String>,
121    /// Required parameters
122    pub required_params: Vec<String>,
123    /// Optional parameters
124    pub optional_params: Vec<String>,
125    /// Parameter statistics (value analysis)
126    pub param_stats: HashMap<String, ParamStats>,
127    /// Minimum payload size
128    pub min_payload_size: usize,
129    /// Maximum payload size
130    pub max_payload_size: usize,
131    /// Sample count used to build schema
132    pub sample_count: u32,
133    /// Last updated timestamp (ms)
134    pub last_updated_ms: u64,
135}
136
137impl ParameterSchema {
138    /// Create a new schema from an endpoint profile.
139    pub fn from_profile(profile: &EndpointProfile, param_threshold: f64) -> Self {
140        let now_ms = std::time::SystemTime::now()
141            .duration_since(std::time::UNIX_EPOCH)
142            .map(|d| d.as_millis() as u64)
143            .unwrap_or(0);
144
145        // Extract expected content types (seen in >10% of requests)
146        let expected_content_types: Vec<String> = if profile.sample_count > 0 {
147            profile
148                .content_types
149                .iter()
150                .filter(|(_, &count)| count as f64 / profile.sample_count as f64 > 0.1)
151                .map(|(ct, _)| ct.clone())
152                .collect()
153        } else {
154            Vec::new()
155        };
156
157        // Separate required (>80% frequency) vs optional params
158        let mut required_params = Vec::new();
159        let mut optional_params = Vec::new();
160        let mut param_stats = HashMap::new();
161
162        for (param, stats) in &profile.expected_params {
163            // Clone stats for schema
164            param_stats.insert(param.clone(), stats.clone());
165
166            if profile.param_frequency(param) >= param_threshold {
167                required_params.push(param.clone());
168            } else {
169                optional_params.push(param.clone());
170            }
171        }
172
173        Self {
174            template: profile.template.clone(),
175            expected_content_types,
176            required_params,
177            optional_params,
178            param_stats,
179            min_payload_size: profile.payload_size.min() as usize,
180            max_payload_size: profile.payload_size.max() as usize,
181            sample_count: profile.sample_count,
182            last_updated_ms: now_ms,
183        }
184    }
185}
186
187impl Profiler {
188    /// Create a new profiler with the given configuration.
189    pub fn new(config: ProfilerConfig) -> Self {
190        Self {
191            config,
192            profiles: DashMap::new(),
193            schemas: DashMap::new(),
194        }
195    }
196
197    /// Check if profiling is enabled.
198    pub fn is_enabled(&self) -> bool {
199        self.config.enabled
200    }
201
202    /// Get or create an endpoint profile.
203    pub fn get_or_create_profile(&self, template: &str) -> Option<EndpointProfile> {
204        if !self.config.enabled {
205            return None;
206        }
207
208        // Note: max_profiles is enforced approximately under concurrent access (soft limit).
209        // Bounded overrun is proportional to the number of concurrent callers.
210        if !self.profiles.contains_key(template) && self.profiles.len() >= self.config.max_profiles
211        {
212            return None; // At capacity, don't create new profile
213        }
214
215        let now_ms = std::time::SystemTime::now()
216            .duration_since(std::time::UNIX_EPOCH)
217            .map(|d| d.as_millis() as u64)
218            .unwrap_or(0);
219
220        Some(
221            self.profiles
222                .entry(template.to_string())
223                .or_insert_with(|| EndpointProfile::new(template.to_string(), now_ms))
224                .clone(),
225        )
226    }
227
228    /// Update an endpoint profile with request data.
229    ///
230    /// Respects the `freeze_after_samples` setting to prevent model poisoning.
231    /// If a profile has reached the freeze threshold, updates are silently ignored.
232    pub fn update_profile(
233        &self,
234        template: &str,
235        payload_size: usize,
236        params: &[(&str, &str)],
237        content_type: Option<&str>,
238    ) {
239        if !self.config.enabled {
240            return;
241        }
242
243        let now_ms = std::time::SystemTime::now()
244            .duration_since(std::time::UNIX_EPOCH)
245            .map(|d| d.as_millis() as u64)
246            .unwrap_or(0);
247
248        if let Some(mut profile) = self.profiles.get_mut(template) {
249            // Check if profile is frozen (anti-poisoning measure)
250            if self.config.freeze_after_samples > 0
251                && profile.sample_count >= self.config.freeze_after_samples
252            {
253                return; // Profile frozen, reject updates
254            }
255            profile.update(payload_size, params, content_type, now_ms);
256        } else {
257            // Note: max_profiles is enforced approximately under concurrent access (soft limit).
258            if self.profiles.len() < self.config.max_profiles {
259                let mut profile = EndpointProfile::new(template.to_string(), now_ms);
260                profile.update(payload_size, params, content_type, now_ms);
261                self.profiles.insert(template.to_string(), profile);
262            }
263        }
264    }
265
266    /// Update an endpoint profile with response data.
267    ///
268    /// Respects the `freeze_after_samples` setting to prevent model poisoning.
269    pub fn update_response_profile(
270        &self,
271        template: &str,
272        response_size: usize,
273        status_code: u16,
274        content_type: Option<&str>,
275    ) {
276        if !self.config.enabled {
277            return;
278        }
279
280        let now_ms = std::time::SystemTime::now()
281            .duration_since(std::time::UNIX_EPOCH)
282            .map(|d| d.as_millis() as u64)
283            .unwrap_or(0);
284
285        if let Some(mut profile) = self.profiles.get_mut(template) {
286            // Check if profile is frozen (anti-poisoning measure)
287            if self.config.freeze_after_samples > 0
288                && profile.sample_count >= self.config.freeze_after_samples
289            {
290                return; // Profile frozen, reject updates
291            }
292            profile.update_response(response_size, status_code, content_type, now_ms);
293        }
294    }
295
296    /// Get all profiles.
297    pub fn get_profiles(&self) -> Vec<EndpointProfile> {
298        self.profiles.iter().map(|e| e.value().clone()).collect()
299    }
300
301    /// Get a specific profile by template.
302    pub fn get_profile(&self, template: &str) -> Option<EndpointProfile> {
303        self.profiles.get(template).map(|p| p.value().clone())
304    }
305
306    /// Get the number of profiles.
307    pub fn profile_count(&self) -> usize {
308        self.profiles.len()
309    }
310
311    /// Learn schema from a profile if it has enough samples.
312    pub fn learn_schema(&self, template: &str) {
313        if !self.config.enabled {
314            return;
315        }
316
317        if let Some(profile) = self.profiles.get(template) {
318            if profile.is_mature(self.config.min_samples_for_validation) {
319                // Note: max_schemas is enforced approximately under concurrent access (soft limit).
320                if self.schemas.len() < self.config.max_schemas {
321                    let schema = ParameterSchema::from_profile(&profile, 0.8);
322                    self.schemas.insert(template.to_string(), schema);
323                }
324            }
325        }
326    }
327
328    /// Get all learned schemas.
329    pub fn get_schemas(&self) -> Vec<ParameterSchema> {
330        self.schemas.iter().map(|e| e.value().clone()).collect()
331    }
332
333    /// Get a specific schema by template.
334    pub fn get_schema(&self, template: &str) -> Option<ParameterSchema> {
335        self.schemas.get(template).map(|s| s.value().clone())
336    }
337
338    /// Get the number of schemas.
339    pub fn schema_count(&self) -> usize {
340        self.schemas.len()
341    }
342
343    /// Reset all profiles (for testing).
344    pub fn reset_profiles(&self) {
345        self.profiles.clear();
346    }
347
348    /// Reset all schemas (for testing).
349    pub fn reset_schemas(&self) {
350        self.schemas.clear();
351    }
352
353    /// Analyze a request against the learned profile.
354    ///
355    /// Uses configurable thresholds for anomaly detection and optional PII redaction.
356    pub fn analyze_request(
357        &self,
358        template: &str,
359        payload_size: usize,
360        params: &[(&str, &str)],
361        content_type: Option<&str>,
362    ) -> AnomalyResult {
363        if !self.config.enabled {
364            return AnomalyResult::none();
365        }
366
367        let profile_ref = self.profiles.get(template);
368        let profile = match profile_ref {
369            Some(p) if p.value().is_mature(self.config.min_samples_for_validation) => p,
370            _ => return AnomalyResult::none(), // No mature profile yet
371        };
372        let profile = profile.value();
373
374        let mut result = AnomalyResult::new();
375
376        // Check payload size anomaly using configurable threshold
377        let z_score = profile.payload_size.z_score(payload_size as f64);
378        if z_score > self.config.payload_z_threshold {
379            result.add(
380                AnomalySignalType::PayloadSizeHigh,
381                (z_score.min(10.0) as u8).max(1),
382                format!(
383                    "Payload size {} is {:.1} std devs above mean",
384                    payload_size, z_score
385                ),
386            );
387        } else if z_score < -self.config.payload_z_threshold {
388            result.add(
389                AnomalySignalType::PayloadSizeLow,
390                2,
391                format!(
392                    "Payload size {} is {:.1} std devs below mean",
393                    payload_size,
394                    z_score.abs()
395                ),
396            );
397        }
398
399        // Check for unexpected parameters and value anomalies
400        for &(param, value) in params {
401            if profile.param_frequency(param) < 0.01 {
402                // Redact value if PII protection is enabled
403                let display_value = if self.config.redact_pii && is_likely_pii(value) {
404                    redact_value(value)
405                } else {
406                    value.to_string()
407                };
408                result.add(
409                    AnomalySignalType::UnexpectedParam,
410                    3,
411                    format!("Unexpected parameter: {} (value: {})", param, display_value),
412                );
413            } else if let Some(stats) = profile.expected_params.get(param) {
414                // Value length analysis using configurable threshold
415                let len_z = stats.length_dist.z_score(value.len() as f64);
416                if len_z > self.config.param_z_threshold {
417                    // Redact value if PII protection is enabled
418                    let display_value = if self.config.redact_pii && is_likely_pii(value) {
419                        redact_value(value)
420                    } else if self.config.redact_pii && value.len() > 20 {
421                        // Redact long values that might contain sensitive data
422                        redact_value(value)
423                    } else {
424                        value.to_string()
425                    };
426                    result.add(
427                        AnomalySignalType::ParamValueAnomaly,
428                        (len_z.min(10.0) as u8).max(1),
429                        format!(
430                            "Parameter {} length {} is anomalous (z={:.1}, value: {})",
431                            param,
432                            value.len(),
433                            len_z,
434                            display_value
435                        ),
436                    );
437                }
438
439                // Type check (if numeric is dominant)
440                // FIX: Prevent division by zero when stats.count is 0
441                if stats.count > 0 {
442                    let numeric_ratio =
443                        *stats.type_counts.get("numeric").unwrap_or(&0) as f64 / stats.count as f64;
444                    if numeric_ratio > self.config.type_ratio_threshold
445                        && value.parse::<f64>().is_err()
446                    {
447                        result.add(
448                            AnomalySignalType::ParamValueAnomaly,
449                            5,
450                            format!("Parameter {} expected numeric, got string", param),
451                        );
452                    }
453                }
454            }
455        }
456
457        // Check content type mismatch
458        if let Some(ct) = content_type {
459            if let Some(dominant) = profile.dominant_content_type() {
460                if ct != dominant && !profile.content_types.contains_key(ct) {
461                    result.add(
462                        AnomalySignalType::ContentTypeMismatch,
463                        5,
464                        format!(
465                            "Content-Type {} not seen before (expected {})",
466                            ct, dominant
467                        ),
468                    );
469                }
470            }
471        }
472
473        result.normalize();
474        result
475    }
476
477    /// Analyze a response against the learned profile.
478    ///
479    /// Uses configurable thresholds for response size anomaly detection.
480    pub fn analyze_response(
481        &self,
482        template: &str,
483        response_size: usize,
484        status_code: u16,
485        content_type: Option<&str>,
486    ) -> AnomalyResult {
487        if !self.config.enabled {
488            return AnomalyResult::none();
489        }
490
491        let profile_ref = self.profiles.get(template);
492        let profile = match profile_ref {
493            Some(p) if p.value().is_mature(self.config.min_samples_for_validation) => p,
494            _ => return AnomalyResult::none(),
495        };
496        let profile = profile.value();
497
498        let mut result = AnomalyResult::new();
499
500        // Check response size anomaly (potential data leak) using configurable threshold
501        let size_z = profile.response_size.z_score(response_size as f64);
502        if size_z > self.config.response_z_threshold {
503            result.add(
504                AnomalySignalType::PayloadSizeHigh, // Re-using PayloadSizeHigh for response too
505                (size_z.min(10.0) as u8).max(1),
506                format!(
507                    "Response size {} is {:.1} std devs above mean (possible leak)",
508                    response_size, size_z
509                ),
510            );
511        }
512
513        // Check status code anomaly (error spike)
514        if status_code >= 500 {
515            let error_rate = profile.error_rate();
516            if error_rate < 0.05 {
517                // Normally stable endpoint
518                result.add(
519                    AnomalySignalType::AbnormalErrorRate,
520                    5,
521                    format!(
522                        "Unexpected 5xx error (usual rate: {:.1}%)",
523                        error_rate * 100.0
524                    ),
525                );
526            }
527        }
528
529        // Check content type mismatch
530        if let Some(ct) = content_type {
531            if let Some(dominant) = profile.dominant_response_content_type() {
532                if ct != dominant && !profile.response_content_types.contains_key(ct) {
533                    result.add(
534                        AnomalySignalType::ContentTypeMismatch,
535                        3,
536                        format!(
537                            "Response Content-Type {} not seen before (expected {})",
538                            ct, dominant
539                        ),
540                    );
541                }
542            }
543        }
544
545        result.normalize();
546        result
547    }
548
549    /// Check if a profile is frozen (no longer accepts updates).
550    ///
551    /// Frozen baselines help prevent model poisoning attacks where attackers
552    /// gradually shift the baseline to make malicious patterns appear normal.
553    pub fn is_profile_frozen(&self, template: &str) -> bool {
554        if self.config.freeze_after_samples == 0 {
555            return false; // Freezing disabled
556        }
557        self.profiles
558            .get(template)
559            .map(|p| p.value().sample_count >= self.config.freeze_after_samples)
560            .unwrap_or(false)
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567
568    fn default_config() -> ProfilerConfig {
569        ProfilerConfig {
570            enabled: true,
571            max_profiles: 100,
572            max_schemas: 50,
573            min_samples_for_validation: 10,
574            ..Default::default()
575        }
576    }
577
578    #[test]
579    fn test_profiler_new() {
580        let profiler = Profiler::new(default_config());
581        assert!(profiler.is_enabled());
582        assert_eq!(profiler.profile_count(), 0);
583        assert_eq!(profiler.schema_count(), 0);
584    }
585
586    #[test]
587    fn test_profiler_update_and_get_profile() {
588        let profiler = Profiler::new(default_config());
589
590        profiler.update_profile(
591            "/api/users",
592            100,
593            &[("name", "alice"), ("email", "a@example.com")],
594            Some("application/json"),
595        );
596
597        assert_eq!(profiler.profile_count(), 1);
598
599        let profile = profiler.get_profile("/api/users").unwrap();
600        assert_eq!(profile.sample_count, 1);
601    }
602
603    #[test]
604    fn test_profiler_disabled() {
605        let config = ProfilerConfig {
606            enabled: false,
607            ..default_config()
608        };
609        let profiler = Profiler::new(config);
610
611        profiler.update_profile("/api/users", 100, &[], None);
612
613        assert_eq!(profiler.profile_count(), 0);
614    }
615
616    #[test]
617    fn test_profiler_max_profiles() {
618        let config = ProfilerConfig {
619            max_profiles: 2,
620            ..default_config()
621        };
622        let profiler = Profiler::new(config);
623
624        profiler.update_profile("/api/a", 100, &[], None);
625        profiler.update_profile("/api/b", 100, &[], None);
626        profiler.update_profile("/api/c", 100, &[], None);
627
628        // Should not exceed max_profiles
629        assert_eq!(profiler.profile_count(), 2);
630    }
631
632    #[test]
633    fn test_profiler_learn_schema() {
634        let config = ProfilerConfig {
635            min_samples_for_validation: 5,
636            ..default_config()
637        };
638        let profiler = Profiler::new(config);
639
640        // Add samples
641        for i in 0..10 {
642            profiler.update_profile(
643                "/api/users",
644                100 + i,
645                &[("name", "alice")],
646                Some("application/json"),
647            );
648        }
649
650        profiler.learn_schema("/api/users");
651
652        assert_eq!(profiler.schema_count(), 1);
653        let schema = profiler.get_schema("/api/users").unwrap();
654        assert_eq!(schema.template, "/api/users");
655    }
656
657    #[test]
658    fn test_profiler_reset() {
659        let profiler = Profiler::new(default_config());
660
661        profiler.update_profile("/api/a", 100, &[], None);
662        profiler.update_profile("/api/b", 100, &[], None);
663
664        assert_eq!(profiler.profile_count(), 2);
665
666        profiler.reset_profiles();
667
668        assert_eq!(profiler.profile_count(), 0);
669    }
670}