mod distribution;
mod endpoint_profile;
mod rate_tracker;
mod signals;
pub mod entropy;
pub mod header_profiler;
pub mod header_types;
pub mod patterns;
pub mod profile_store;
pub mod schema_learner;
pub mod schema_types;
pub mod template_intern;
#[cfg(test)]
mod value_analysis_tests;
pub use distribution::{Distribution, PercentilesTracker};
pub use endpoint_profile::{is_likely_pii, redact_value, EndpointProfile, ParamStats};
pub use rate_tracker::RateTracker;
pub use signals::{AnomalyResult, AnomalySignal, AnomalySignalType};
pub use entropy::{entropy_z_score, is_entropy_anomaly, normalized_entropy, shannon_entropy};
pub use header_profiler::{HeaderProfiler, HeaderProfilerStats};
pub use header_types::{HeaderAnomaly, HeaderAnomalyResult, HeaderBaseline, ValueStats};
pub use patterns::{detect_pattern, matches_pattern};
pub use profile_store::{
ProfileStore, ProfileStoreConfig, ProfileStoreMetrics, SegmentCardinality,
};
pub use schema_learner::{SchemaLearner, SchemaLearnerConfig, SchemaLearnerStats};
pub use schema_types::{
EndpointSchema as JsonEndpointSchema, FieldSchema, FieldType, PatternType, SchemaViolation,
ValidationResult, ViolationSeverity, ViolationType,
};
pub use template_intern::{
cache_stats as template_cache_stats, intern_template, normalize_and_intern,
};
use dashmap::DashMap;
use std::collections::HashMap;
use crate::config::ProfilerConfig;
#[derive(Debug)]
pub struct Profiler {
config: ProfilerConfig,
profiles: DashMap<String, EndpointProfile>,
schemas: DashMap<String, ParameterSchema>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ParameterSchema {
pub template: String,
pub expected_content_types: Vec<String>,
pub required_params: Vec<String>,
pub optional_params: Vec<String>,
pub param_stats: HashMap<String, ParamStats>,
pub min_payload_size: usize,
pub max_payload_size: usize,
pub sample_count: u32,
pub last_updated_ms: u64,
}
impl ParameterSchema {
pub fn from_profile(profile: &EndpointProfile, param_threshold: f64) -> Self {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let expected_content_types: Vec<String> = if profile.sample_count > 0 {
profile
.content_types
.iter()
.filter(|(_, &count)| count as f64 / profile.sample_count as f64 > 0.1)
.map(|(ct, _)| ct.clone())
.collect()
} else {
Vec::new()
};
let mut required_params = Vec::new();
let mut optional_params = Vec::new();
let mut param_stats = HashMap::new();
for (param, stats) in &profile.expected_params {
param_stats.insert(param.clone(), stats.clone());
if profile.param_frequency(param) >= param_threshold {
required_params.push(param.clone());
} else {
optional_params.push(param.clone());
}
}
Self {
template: profile.template.clone(),
expected_content_types,
required_params,
optional_params,
param_stats,
min_payload_size: profile.payload_size.min() as usize,
max_payload_size: profile.payload_size.max() as usize,
sample_count: profile.sample_count,
last_updated_ms: now_ms,
}
}
}
impl Profiler {
pub fn new(config: ProfilerConfig) -> Self {
Self {
config,
profiles: DashMap::new(),
schemas: DashMap::new(),
}
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub fn get_or_create_profile(&self, template: &str) -> Option<EndpointProfile> {
if !self.config.enabled {
return None;
}
if !self.profiles.contains_key(template) && self.profiles.len() >= self.config.max_profiles
{
return None; }
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
Some(
self.profiles
.entry(template.to_string())
.or_insert_with(|| EndpointProfile::new(template.to_string(), now_ms))
.clone(),
)
}
pub fn update_profile(
&self,
template: &str,
payload_size: usize,
params: &[(&str, &str)],
content_type: Option<&str>,
) {
if !self.config.enabled {
return;
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if let Some(mut profile) = self.profiles.get_mut(template) {
if self.config.freeze_after_samples > 0
&& profile.sample_count >= self.config.freeze_after_samples
{
return; }
profile.update(payload_size, params, content_type, now_ms);
} else {
if self.profiles.len() < self.config.max_profiles {
let mut profile = EndpointProfile::new(template.to_string(), now_ms);
profile.update(payload_size, params, content_type, now_ms);
self.profiles.insert(template.to_string(), profile);
}
}
}
pub fn update_response_profile(
&self,
template: &str,
response_size: usize,
status_code: u16,
content_type: Option<&str>,
) {
if !self.config.enabled {
return;
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if let Some(mut profile) = self.profiles.get_mut(template) {
if self.config.freeze_after_samples > 0
&& profile.sample_count >= self.config.freeze_after_samples
{
return; }
profile.update_response(response_size, status_code, content_type, now_ms);
}
}
pub fn get_profiles(&self) -> Vec<EndpointProfile> {
self.profiles.iter().map(|e| e.value().clone()).collect()
}
pub fn get_profile(&self, template: &str) -> Option<EndpointProfile> {
self.profiles.get(template).map(|p| p.value().clone())
}
pub fn profile_count(&self) -> usize {
self.profiles.len()
}
pub fn learn_schema(&self, template: &str) {
if !self.config.enabled {
return;
}
if let Some(profile) = self.profiles.get(template) {
if profile.is_mature(self.config.min_samples_for_validation) {
if self.schemas.len() < self.config.max_schemas {
let schema = ParameterSchema::from_profile(&profile, 0.8);
self.schemas.insert(template.to_string(), schema);
}
}
}
}
pub fn get_schemas(&self) -> Vec<ParameterSchema> {
self.schemas.iter().map(|e| e.value().clone()).collect()
}
pub fn get_schema(&self, template: &str) -> Option<ParameterSchema> {
self.schemas.get(template).map(|s| s.value().clone())
}
pub fn schema_count(&self) -> usize {
self.schemas.len()
}
pub fn reset_profiles(&self) {
self.profiles.clear();
}
pub fn reset_schemas(&self) {
self.schemas.clear();
}
pub fn analyze_request(
&self,
template: &str,
payload_size: usize,
params: &[(&str, &str)],
content_type: Option<&str>,
) -> AnomalyResult {
if !self.config.enabled {
return AnomalyResult::none();
}
let profile_ref = self.profiles.get(template);
let profile = match profile_ref {
Some(p) if p.value().is_mature(self.config.min_samples_for_validation) => p,
_ => return AnomalyResult::none(), };
let profile = profile.value();
let mut result = AnomalyResult::new();
let z_score = profile.payload_size.z_score(payload_size as f64);
if z_score > self.config.payload_z_threshold {
result.add(
AnomalySignalType::PayloadSizeHigh,
(z_score.min(10.0) as u8).max(1),
format!(
"Payload size {} is {:.1} std devs above mean",
payload_size, z_score
),
);
} else if z_score < -self.config.payload_z_threshold {
result.add(
AnomalySignalType::PayloadSizeLow,
2,
format!(
"Payload size {} is {:.1} std devs below mean",
payload_size,
z_score.abs()
),
);
}
for &(param, value) in params {
if profile.param_frequency(param) < 0.01 {
let display_value = if self.config.redact_pii && is_likely_pii(value) {
redact_value(value)
} else {
value.to_string()
};
result.add(
AnomalySignalType::UnexpectedParam,
3,
format!("Unexpected parameter: {} (value: {})", param, display_value),
);
} else if let Some(stats) = profile.expected_params.get(param) {
let len_z = stats.length_dist.z_score(value.len() as f64);
if len_z > self.config.param_z_threshold {
let display_value = if self.config.redact_pii && is_likely_pii(value) {
redact_value(value)
} else if self.config.redact_pii && value.len() > 20 {
redact_value(value)
} else {
value.to_string()
};
result.add(
AnomalySignalType::ParamValueAnomaly,
(len_z.min(10.0) as u8).max(1),
format!(
"Parameter {} length {} is anomalous (z={:.1}, value: {})",
param,
value.len(),
len_z,
display_value
),
);
}
if stats.count > 0 {
let numeric_ratio =
*stats.type_counts.get("numeric").unwrap_or(&0) as f64 / stats.count as f64;
if numeric_ratio > self.config.type_ratio_threshold
&& value.parse::<f64>().is_err()
{
result.add(
AnomalySignalType::ParamValueAnomaly,
5,
format!("Parameter {} expected numeric, got string", param),
);
}
}
}
}
if let Some(ct) = content_type {
if let Some(dominant) = profile.dominant_content_type() {
if ct != dominant && !profile.content_types.contains_key(ct) {
result.add(
AnomalySignalType::ContentTypeMismatch,
5,
format!(
"Content-Type {} not seen before (expected {})",
ct, dominant
),
);
}
}
}
result.normalize();
result
}
pub fn analyze_response(
&self,
template: &str,
response_size: usize,
status_code: u16,
content_type: Option<&str>,
) -> AnomalyResult {
if !self.config.enabled {
return AnomalyResult::none();
}
let profile_ref = self.profiles.get(template);
let profile = match profile_ref {
Some(p) if p.value().is_mature(self.config.min_samples_for_validation) => p,
_ => return AnomalyResult::none(),
};
let profile = profile.value();
let mut result = AnomalyResult::new();
let size_z = profile.response_size.z_score(response_size as f64);
if size_z > self.config.response_z_threshold {
result.add(
AnomalySignalType::PayloadSizeHigh, (size_z.min(10.0) as u8).max(1),
format!(
"Response size {} is {:.1} std devs above mean (possible leak)",
response_size, size_z
),
);
}
if status_code >= 500 {
let error_rate = profile.error_rate();
if error_rate < 0.05 {
result.add(
AnomalySignalType::AbnormalErrorRate,
5,
format!(
"Unexpected 5xx error (usual rate: {:.1}%)",
error_rate * 100.0
),
);
}
}
if let Some(ct) = content_type {
if let Some(dominant) = profile.dominant_response_content_type() {
if ct != dominant && !profile.response_content_types.contains_key(ct) {
result.add(
AnomalySignalType::ContentTypeMismatch,
3,
format!(
"Response Content-Type {} not seen before (expected {})",
ct, dominant
),
);
}
}
}
result.normalize();
result
}
pub fn is_profile_frozen(&self, template: &str) -> bool {
if self.config.freeze_after_samples == 0 {
return false; }
self.profiles
.get(template)
.map(|p| p.value().sample_count >= self.config.freeze_after_samples)
.unwrap_or(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn default_config() -> ProfilerConfig {
ProfilerConfig {
enabled: true,
max_profiles: 100,
max_schemas: 50,
min_samples_for_validation: 10,
..Default::default()
}
}
#[test]
fn test_profiler_new() {
let profiler = Profiler::new(default_config());
assert!(profiler.is_enabled());
assert_eq!(profiler.profile_count(), 0);
assert_eq!(profiler.schema_count(), 0);
}
#[test]
fn test_profiler_update_and_get_profile() {
let profiler = Profiler::new(default_config());
profiler.update_profile(
"/api/users",
100,
&[("name", "alice"), ("email", "a@example.com")],
Some("application/json"),
);
assert_eq!(profiler.profile_count(), 1);
let profile = profiler.get_profile("/api/users").unwrap();
assert_eq!(profile.sample_count, 1);
}
#[test]
fn test_profiler_disabled() {
let config = ProfilerConfig {
enabled: false,
..default_config()
};
let profiler = Profiler::new(config);
profiler.update_profile("/api/users", 100, &[], None);
assert_eq!(profiler.profile_count(), 0);
}
#[test]
fn test_profiler_max_profiles() {
let config = ProfilerConfig {
max_profiles: 2,
..default_config()
};
let profiler = Profiler::new(config);
profiler.update_profile("/api/a", 100, &[], None);
profiler.update_profile("/api/b", 100, &[], None);
profiler.update_profile("/api/c", 100, &[], None);
assert_eq!(profiler.profile_count(), 2);
}
#[test]
fn test_profiler_learn_schema() {
let config = ProfilerConfig {
min_samples_for_validation: 5,
..default_config()
};
let profiler = Profiler::new(config);
for i in 0..10 {
profiler.update_profile(
"/api/users",
100 + i,
&[("name", "alice")],
Some("application/json"),
);
}
profiler.learn_schema("/api/users");
assert_eq!(profiler.schema_count(), 1);
let schema = profiler.get_schema("/api/users").unwrap();
assert_eq!(schema.template, "/api/users");
}
#[test]
fn test_profiler_reset() {
let profiler = Profiler::new(default_config());
profiler.update_profile("/api/a", 100, &[], None);
profiler.update_profile("/api/b", 100, &[], None);
assert_eq!(profiler.profile_count(), 2);
profiler.reset_profiles();
assert_eq!(profiler.profile_count(), 0);
}
}