use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use crate::Policy;
use crate::engine::CompiledMatchers;
use crate::engine::signal::{LogSignal, MetricSignal, TraceSignal};
use crate::error::PolicyError;
use crate::provider::{PolicyProvider, StatsCollector};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ProviderId(u64);
#[derive(Debug, Default)]
pub struct TransformStageStats {
pub hits: AtomicU64,
pub misses: AtomicU64,
}
impl TransformStageStats {
pub fn record_hit(&self) {
self.hits.fetch_add(1, Ordering::Relaxed);
}
pub fn record_miss(&self) {
self.misses.fetch_add(1, Ordering::Relaxed);
}
pub fn hits(&self) -> u64 {
self.hits.load(Ordering::Relaxed)
}
pub fn misses(&self) -> u64 {
self.misses.load(Ordering::Relaxed)
}
pub fn reset(&self) -> (u64, u64) {
let hits = self.hits.swap(0, Ordering::Relaxed);
let misses = self.misses.swap(0, Ordering::Relaxed);
(hits, misses)
}
}
#[derive(Debug, Default)]
pub struct PolicyStats {
pub match_hits: AtomicU64,
pub match_misses: AtomicU64,
pub remove: TransformStageStats,
pub redact: TransformStageStats,
pub rename: TransformStageStats,
pub add: TransformStageStats,
}
impl PolicyStats {
pub fn record_hit(&self) {
self.match_hits.fetch_add(1, Ordering::Relaxed);
}
pub fn record_miss(&self) {
self.match_misses.fetch_add(1, Ordering::Relaxed);
}
pub fn hits(&self) -> u64 {
self.match_hits.load(Ordering::Relaxed)
}
pub fn misses(&self) -> u64 {
self.match_misses.load(Ordering::Relaxed)
}
pub fn reset(&self) -> (u64, u64) {
let hits = self.match_hits.swap(0, Ordering::Relaxed);
let misses = self.match_misses.swap(0, Ordering::Relaxed);
(hits, misses)
}
pub fn reset_all(&self) -> PolicyStatsSnapshot {
PolicyStatsSnapshot {
match_hits: self.match_hits.swap(0, Ordering::Relaxed),
match_misses: self.match_misses.swap(0, Ordering::Relaxed),
remove: self.remove.reset(),
redact: self.redact.reset(),
rename: self.rename.reset(),
add: self.add.reset(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PolicyStatsSnapshot {
pub match_hits: u64,
pub match_misses: u64,
pub remove: (u64, u64),
pub redact: (u64, u64),
pub rename: (u64, u64),
pub add: (u64, u64),
}
#[derive(Debug)]
pub struct PolicyEntry {
pub policy: Policy,
pub provider_id: ProviderId,
pub stats: Arc<PolicyStats>,
}
#[derive(Debug, Clone)]
pub struct PolicySnapshot {
inner: Arc<SnapshotInner>,
}
#[derive(Debug)]
struct SnapshotInner {
policies: Vec<PolicyEntry>,
index: HashMap<String, usize>,
compiled_logs: Option<CompiledMatchers<LogSignal>>,
compiled_metrics: Option<CompiledMatchers<MetricSignal>>,
compiled_traces: Option<CompiledMatchers<TraceSignal>>,
}
impl PolicySnapshot {
fn empty() -> Self {
Self {
inner: Arc::new(SnapshotInner {
policies: Vec::new(),
index: HashMap::new(),
compiled_logs: None,
compiled_metrics: None,
compiled_traces: None,
}),
}
}
pub fn policies(&self) -> &[PolicyEntry] {
&self.inner.policies
}
pub fn get(&self, id: &str) -> Option<&PolicyEntry> {
self.inner
.index
.get(id)
.map(|&idx| &self.inner.policies[idx])
}
pub fn len(&self) -> usize {
self.inner.policies.len()
}
pub fn is_empty(&self) -> bool {
self.inner.policies.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &PolicyEntry> {
self.inner.policies.iter()
}
pub fn compiled_log_matchers(&self) -> Option<&CompiledMatchers<LogSignal>> {
self.inner.compiled_logs.as_ref()
}
pub fn compiled_metric_matchers(&self) -> Option<&CompiledMatchers<MetricSignal>> {
self.inner.compiled_metrics.as_ref()
}
pub fn compiled_trace_matchers(&self) -> Option<&CompiledMatchers<TraceSignal>> {
self.inner.compiled_traces.as_ref()
}
}
type PolicyWithStats = (Policy, Arc<PolicyStats>);
type ProviderPolicies = HashMap<ProviderId, Vec<PolicyWithStats>>;
#[derive(Clone)]
pub struct ProviderHandle {
provider_id: ProviderId,
registry: Arc<RegistryInner>,
}
impl ProviderHandle {
pub fn update(&self, policies: Vec<Policy>) {
self.registry.update_provider(self.provider_id, policies);
}
pub fn provider_id(&self) -> ProviderId {
self.provider_id
}
}
struct RegistryInner {
next_provider_id: AtomicU64,
providers: RwLock<ProviderPolicies>,
snapshot: RwLock<PolicySnapshot>,
}
impl RegistryInner {
fn new() -> Self {
Self {
next_provider_id: AtomicU64::new(0),
providers: RwLock::new(HashMap::new()),
snapshot: RwLock::new(PolicySnapshot::empty()),
}
}
fn register_provider(&self) -> ProviderId {
let id = ProviderId(self.next_provider_id.fetch_add(1, Ordering::Relaxed));
let mut providers = self.providers.write().unwrap();
providers.insert(id, Vec::new());
id
}
fn update_provider(&self, provider_id: ProviderId, policies: Vec<Policy>) {
let mut providers = self.providers.write().unwrap();
let existing_stats: HashMap<String, Arc<PolicyStats>> = providers
.get(&provider_id)
.map(|entries| {
entries
.iter()
.map(|(p, s)| (p.id().to_string(), Arc::clone(s)))
.collect()
})
.unwrap_or_default();
let new_entries: Vec<(Policy, Arc<PolicyStats>)> = policies
.into_iter()
.map(|policy| {
let stats = existing_stats.get(policy.id()).cloned().unwrap_or_default();
(policy, stats)
})
.collect();
providers.insert(provider_id, new_entries);
self.rebuild_snapshot(&providers);
}
fn rebuild_snapshot(&self, providers: &ProviderPolicies) {
let mut policies = Vec::new();
let mut index = HashMap::new();
let mut log_policies: Vec<(Policy, Arc<PolicyStats>)> = Vec::new();
let mut metric_policies: Vec<(Policy, Arc<PolicyStats>)> = Vec::new();
let mut trace_policies: Vec<(Policy, Arc<PolicyStats>)> = Vec::new();
for (&provider_id, entries) in providers {
for (policy, stats) in entries {
let idx = policies.len();
index.insert(policy.id().to_string(), idx);
policies.push(PolicyEntry {
policy: policy.clone(),
provider_id,
stats: Arc::clone(stats),
});
if policy.log_target().is_some() {
log_policies.push((policy.clone(), Arc::clone(stats)));
} else if policy.metric_target().is_some() {
metric_policies.push((policy.clone(), Arc::clone(stats)));
} else if policy.trace_target().is_some() {
trace_policies.push((policy.clone(), Arc::clone(stats)));
}
}
}
log_policies.sort_by(|a, b| a.0.id().cmp(b.0.id()));
metric_policies.sort_by(|a, b| a.0.id().cmp(b.0.id()));
trace_policies.sort_by(|a, b| a.0.id().cmp(b.0.id()));
let compiled_logs = if !log_policies.is_empty() {
match CompiledMatchers::<LogSignal>::build(log_policies.into_iter()) {
Ok(matchers) => Some(matchers),
Err(e) => {
eprintln!("Failed to compile log policy matchers: {}", e);
None
}
}
} else {
None
};
let compiled_metrics = if !metric_policies.is_empty() {
match CompiledMatchers::<MetricSignal>::build(metric_policies.into_iter()) {
Ok(matchers) => Some(matchers),
Err(e) => {
eprintln!("Failed to compile metric policy matchers: {}", e);
None
}
}
} else {
None
};
let compiled_traces = if !trace_policies.is_empty() {
match CompiledMatchers::<TraceSignal>::build(trace_policies.into_iter()) {
Ok(matchers) => Some(matchers),
Err(e) => {
eprintln!("Failed to compile trace policy matchers: {}", e);
None
}
}
} else {
None
};
let new_snapshot = PolicySnapshot {
inner: Arc::new(SnapshotInner {
policies,
index,
compiled_logs,
compiled_metrics,
compiled_traces,
}),
};
let mut snapshot = self.snapshot.write().unwrap();
*snapshot = new_snapshot;
}
fn snapshot(&self) -> PolicySnapshot {
self.snapshot.read().unwrap().clone()
}
}
pub struct PolicyRegistry {
inner: Arc<RegistryInner>,
}
impl PolicyRegistry {
pub fn new() -> Self {
Self {
inner: Arc::new(RegistryInner::new()),
}
}
pub fn subscribe(&self, provider: &dyn PolicyProvider) -> Result<ProviderId, PolicyError> {
let handle = self.register_provider();
let provider_id = handle.provider_id();
let inner = Arc::clone(&self.inner);
let collector: StatsCollector = Arc::new(move || {
let snapshot = inner.snapshot();
snapshot
.iter()
.map(|entry| (entry.policy.id().to_string(), entry.stats.reset_all()))
.collect()
});
provider.set_stats_collector(collector);
let callback = {
let handle = handle.clone();
Arc::new(move |policies: Vec<Policy>| {
handle.update(policies);
})
};
provider.subscribe(callback)?;
Ok(provider_id)
}
pub fn register_provider(&self) -> ProviderHandle {
let provider_id = self.inner.register_provider();
ProviderHandle {
provider_id,
registry: Arc::clone(&self.inner),
}
}
pub fn snapshot(&self) -> PolicySnapshot {
self.inner.snapshot()
}
pub fn provider_count(&self) -> usize {
self.inner.providers.read().unwrap().len()
}
}
impl Default for PolicyRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::proto::tero::policy::v1::Policy as ProtoPolicy;
fn make_policy(id: &str) -> Policy {
Policy::new(ProtoPolicy {
id: id.to_string(),
name: id.to_string(),
enabled: true,
..Default::default()
})
}
#[test]
fn empty_registry() {
let registry = PolicyRegistry::new();
let snapshot = registry.snapshot();
assert!(snapshot.is_empty());
assert_eq!(snapshot.len(), 0);
}
#[test]
fn register_provider() {
let registry = PolicyRegistry::new();
let handle = registry.register_provider();
assert_eq!(registry.provider_count(), 1);
assert_eq!(handle.provider_id(), ProviderId(0));
let handle2 = registry.register_provider();
assert_eq!(registry.provider_count(), 2);
assert_eq!(handle2.provider_id(), ProviderId(1));
}
#[test]
fn update_policies() {
let registry = PolicyRegistry::new();
let handle = registry.register_provider();
let policies = vec![make_policy("policy-1"), make_policy("policy-2")];
handle.update(policies);
let snapshot = registry.snapshot();
assert_eq!(snapshot.len(), 2);
assert!(snapshot.get("policy-1").is_some());
assert!(snapshot.get("policy-2").is_some());
}
#[test]
fn multiple_providers() {
let registry = PolicyRegistry::new();
let handle1 = registry.register_provider();
let handle2 = registry.register_provider();
handle1.update(vec![make_policy("provider1-policy")]);
handle2.update(vec![make_policy("provider2-policy")]);
let snapshot = registry.snapshot();
assert_eq!(snapshot.len(), 2);
let entry1 = snapshot.get("provider1-policy").unwrap();
let entry2 = snapshot.get("provider2-policy").unwrap();
assert_eq!(entry1.provider_id, ProviderId(0));
assert_eq!(entry2.provider_id, ProviderId(1));
}
#[test]
fn update_replaces_policies() {
let registry = PolicyRegistry::new();
let handle = registry.register_provider();
handle.update(vec![make_policy("old-policy")]);
assert_eq!(registry.snapshot().len(), 1);
assert!(registry.snapshot().get("old-policy").is_some());
handle.update(vec![make_policy("new-policy")]);
assert_eq!(registry.snapshot().len(), 1);
assert!(registry.snapshot().get("old-policy").is_none());
assert!(registry.snapshot().get("new-policy").is_some());
}
#[test]
fn stats_preserved_on_update() {
let registry = PolicyRegistry::new();
let handle = registry.register_provider();
handle.update(vec![make_policy("policy-1")]);
let snapshot1 = registry.snapshot();
let entry1 = snapshot1.get("policy-1").unwrap();
entry1.stats.record_hit();
entry1.stats.record_hit();
assert_eq!(entry1.stats.hits(), 2);
handle.update(vec![make_policy("policy-1")]);
let snapshot2 = registry.snapshot();
let entry2 = snapshot2.get("policy-1").unwrap();
assert_eq!(entry2.stats.hits(), 2);
}
#[test]
fn snapshot_is_immutable() {
let registry = PolicyRegistry::new();
let handle = registry.register_provider();
handle.update(vec![make_policy("policy-1")]);
let snapshot1 = registry.snapshot();
assert_eq!(snapshot1.len(), 1);
handle.update(vec![make_policy("policy-1"), make_policy("policy-2")]);
assert_eq!(snapshot1.len(), 1);
let snapshot2 = registry.snapshot();
assert_eq!(snapshot2.len(), 2);
}
#[test]
fn snapshot_clone_is_cheap() {
let registry = PolicyRegistry::new();
let handle = registry.register_provider();
handle.update(vec![make_policy("policy-1")]);
let snapshot1 = registry.snapshot();
let snapshot2 = snapshot1.clone();
assert!(Arc::ptr_eq(&snapshot1.inner, &snapshot2.inner));
}
#[test]
fn stats_recording() {
let registry = PolicyRegistry::new();
let handle = registry.register_provider();
handle.update(vec![make_policy("policy-1")]);
let snapshot = registry.snapshot();
let entry = snapshot.get("policy-1").unwrap();
entry.stats.record_hit();
entry.stats.record_hit();
entry.stats.record_miss();
assert_eq!(entry.stats.hits(), 2);
assert_eq!(entry.stats.misses(), 1);
let (hits, misses) = entry.stats.reset();
assert_eq!(hits, 2);
assert_eq!(misses, 1);
assert_eq!(entry.stats.hits(), 0);
assert_eq!(entry.stats.misses(), 0);
}
#[test]
fn iterate_policies() {
let registry = PolicyRegistry::new();
let handle = registry.register_provider();
handle.update(vec![
make_policy("policy-1"),
make_policy("policy-2"),
make_policy("policy-3"),
]);
let snapshot = registry.snapshot();
let ids: Vec<&str> = snapshot.iter().map(|e| e.policy.id()).collect();
assert_eq!(ids.len(), 3);
assert!(ids.contains(&"policy-1"));
assert!(ids.contains(&"policy-2"));
assert!(ids.contains(&"policy-3"));
}
#[test]
fn subscribe_to_file_provider() {
use crate::provider::FileProvider;
let registry = PolicyRegistry::new();
let provider = FileProvider::new("testdata/policies.json");
let provider_id = registry.subscribe(&provider).unwrap();
assert_eq!(provider_id, ProviderId(0));
assert_eq!(registry.provider_count(), 1);
let snapshot = registry.snapshot();
assert_eq!(snapshot.len(), 16);
for entry in snapshot.iter() {
assert_eq!(entry.provider_id, provider_id);
}
}
#[test]
fn subscribe_to_multiple_providers() {
use crate::provider::FileProvider;
let registry = PolicyRegistry::new();
let provider1 = FileProvider::new("testdata/policies.json");
let provider2 = FileProvider::new("testdata/policies.json");
let id1 = registry.subscribe(&provider1).unwrap();
let id2 = registry.subscribe(&provider2).unwrap();
assert_ne!(id1, id2);
assert_eq!(registry.provider_count(), 2);
let snapshot = registry.snapshot();
assert_eq!(snapshot.len(), 32);
let provider1_count = snapshot.iter().filter(|e| e.provider_id == id1).count();
let provider2_count = snapshot.iter().filter(|e| e.provider_id == id2).count();
assert_eq!(provider1_count, 16);
assert_eq!(provider2_count, 16);
}
#[test]
fn snapshot_policies_method() {
let registry = PolicyRegistry::new();
let handle = registry.register_provider();
handle.update(vec![make_policy("policy-1"), make_policy("policy-2")]);
let snapshot = registry.snapshot();
let policies = snapshot.policies();
assert_eq!(policies.len(), 2);
assert!(policies.iter().any(|e| e.policy.id() == "policy-1"));
assert!(policies.iter().any(|e| e.policy.id() == "policy-2"));
}
#[test]
fn registry_default() {
let registry = PolicyRegistry::default();
assert!(registry.snapshot().is_empty());
assert_eq!(registry.provider_count(), 0);
}
#[test]
fn subscribe_auto_wires_stats_collector() {
use crate::provider::{PolicyCallback, PolicyProvider, StatsCollector};
use std::sync::RwLock;
struct SpyProvider {
policies: Vec<Policy>,
collector: RwLock<Option<StatsCollector>>,
}
impl PolicyProvider for SpyProvider {
fn set_stats_collector(&self, collector: StatsCollector) {
*self.collector.write().unwrap() = Some(collector);
}
fn subscribe(&self, callback: PolicyCallback) -> Result<(), PolicyError> {
callback(self.policies.clone());
Ok(())
}
}
let provider = SpyProvider {
policies: vec![make_policy("policy-1"), make_policy("policy-2")],
collector: RwLock::new(None),
};
let registry = PolicyRegistry::new();
registry.subscribe(&provider).unwrap();
let collector = provider.collector.read().unwrap();
assert!(collector.is_some(), "stats collector should be auto-wired");
let snapshot = registry.snapshot();
let entry = snapshot.get("policy-1").unwrap();
entry.stats.record_hit();
entry.stats.record_hit();
entry.stats.record_miss();
let stats = collector.as_ref().unwrap()();
assert_eq!(stats.len(), 2);
let p1_stats = stats.iter().find(|(id, _)| id == "policy-1").unwrap();
assert_eq!(p1_stats.1.match_hits, 2);
assert_eq!(p1_stats.1.match_misses, 1);
let stats_again = collector.as_ref().unwrap()();
let p1_again = stats_again.iter().find(|(id, _)| id == "policy-1").unwrap();
assert_eq!(p1_again.1.match_hits, 0);
assert_eq!(p1_again.1.match_misses, 0);
}
}