use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use chio_core::receipt::ChioReceipt;
use chio_kernel::operator_report::EmaBaselineState;
use chio_kernel::{Guard, GuardContext, KernelError, Verdict};
pub const DEFAULT_EMA_ALPHA: f64 = 0.2;
pub const DEFAULT_SIGMA_THRESHOLD: f64 = 2.0;
pub const DEFAULT_WINDOW_SECS: u64 = 60;
pub const DEFAULT_BASELINE_MIN_WINDOWS: u64 = 3;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum BehavioralMetric {
CallRate,
DenyRate,
UniqueTools,
AvgParameterEntropy,
}
impl BehavioralMetric {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::CallRate => "call_rate",
Self::DenyRate => "deny_rate",
Self::UniqueTools => "unique_tools",
Self::AvgParameterEntropy => "avg_parameter_entropy",
}
}
}
pub trait ReceiptFeedSource: Send + Sync {
fn receipts_for_agent(
&self,
agent_id: &str,
since: u64,
until: u64,
) -> Result<Vec<ChioReceipt>, KernelError>;
}
#[derive(Default)]
pub struct InMemoryReceiptFeed {
inner: Mutex<Vec<(String, ChioReceipt)>>,
}
impl InMemoryReceiptFeed {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn push(&self, agent_id: &str, receipt: ChioReceipt) -> Result<(), KernelError> {
let mut inner = self
.inner
.lock()
.map_err(|_| KernelError::Internal("behavioral feed lock poisoned".to_string()))?;
inner.push((agent_id.to_string(), receipt));
Ok(())
}
pub fn len(&self) -> Result<usize, KernelError> {
let inner = self
.inner
.lock()
.map_err(|_| KernelError::Internal("behavioral feed lock poisoned".to_string()))?;
Ok(inner.len())
}
pub fn is_empty(&self) -> Result<bool, KernelError> {
Ok(self.len()? == 0)
}
}
impl ReceiptFeedSource for InMemoryReceiptFeed {
fn receipts_for_agent(
&self,
agent_id: &str,
since: u64,
until: u64,
) -> Result<Vec<ChioReceipt>, KernelError> {
let inner = self
.inner
.lock()
.map_err(|_| KernelError::Internal("behavioral feed lock poisoned".to_string()))?;
Ok(inner
.iter()
.filter(|(id, r)| id == agent_id && r.timestamp >= since && r.timestamp <= until)
.map(|(_, r)| r.clone())
.collect())
}
}
#[derive(Debug, Clone, Copy)]
pub struct BehavioralProfileConfig {
pub ema_alpha: f64,
pub sigma_threshold: f64,
pub window_secs: u64,
pub baseline_min_windows: u64,
}
impl Default for BehavioralProfileConfig {
fn default() -> Self {
Self {
ema_alpha: DEFAULT_EMA_ALPHA,
sigma_threshold: DEFAULT_SIGMA_THRESHOLD,
window_secs: DEFAULT_WINDOW_SECS,
baseline_min_windows: DEFAULT_BASELINE_MIN_WINDOWS,
}
}
}
#[derive(Debug, Clone, Default)]
struct BaselineEntry {
state: EmaBaselineState,
last_window_start: u64,
}
pub struct BehavioralProfileGuard {
name: String,
config: BehavioralProfileConfig,
feed: Box<dyn ReceiptFeedSource>,
baselines: Mutex<HashMap<(String, BehavioralMetric), BaselineEntry>>,
now: Box<dyn Fn() -> u64 + Send + Sync>,
}
impl BehavioralProfileGuard {
pub fn new(feed: Box<dyn ReceiptFeedSource>) -> Self {
Self::with_config(feed, BehavioralProfileConfig::default())
}
pub fn with_config(feed: Box<dyn ReceiptFeedSource>, config: BehavioralProfileConfig) -> Self {
Self {
name: "behavioral-profile".to_string(),
config,
feed,
baselines: Mutex::new(HashMap::new()),
now: Box::new(default_now),
}
}
pub fn with_clock(mut self, clock: Box<dyn Fn() -> u64 + Send + Sync>) -> Self {
self.now = clock;
self
}
pub fn observe_sample(
&self,
agent_id: &str,
metric: BehavioralMetric,
sample: f64,
window_start: u64,
) -> Result<ObservationOutcome, KernelError> {
let mut baselines = self
.baselines
.lock()
.map_err(|_| KernelError::Internal("baseline lock poisoned".to_string()))?;
let entry = baselines.entry((agent_id.to_string(), metric)).or_default();
if entry.last_window_start == window_start && entry.state.sample_count > 0 {
let z = robust_z_score(&entry.state, sample);
let anomaly = z
.map(|z| z.abs() > self.config.sigma_threshold)
.unwrap_or(false);
return Ok(ObservationOutcome {
z_score: z,
anomaly,
baseline: entry.state.clone(),
sample,
});
}
let z = robust_z_score(&entry.state, sample);
let seen_enough = entry.state.sample_count >= self.config.baseline_min_windows;
let anomaly = seen_enough
&& z.map(|z| z.abs() > self.config.sigma_threshold)
.unwrap_or(false);
entry
.state
.update(sample, self.config.ema_alpha, window_start);
entry.last_window_start = window_start;
let baseline = entry.state.clone();
Ok(ObservationOutcome {
z_score: z,
anomaly,
baseline,
sample,
})
}
pub fn baseline(
&self,
agent_id: &str,
metric: BehavioralMetric,
) -> Result<Option<EmaBaselineState>, KernelError> {
let baselines = self
.baselines
.lock()
.map_err(|_| KernelError::Internal("baseline lock poisoned".to_string()))?;
Ok(baselines
.get(&(agent_id.to_string(), metric))
.map(|e| e.state.clone()))
}
fn current_window_start(&self, now: u64) -> u64 {
let window = self.config.window_secs.max(1);
(now / window) * window
}
fn sample_for_window(&self, agent_id: &str, window_start: u64) -> Result<f64, KernelError> {
let window_end = window_start + self.config.window_secs.max(1);
let receipts =
self.feed
.receipts_for_agent(agent_id, window_start, window_end.saturating_sub(1))?;
Ok(receipts.len() as f64)
}
}
#[derive(Debug, Clone)]
pub struct ObservationOutcome {
pub z_score: Option<f64>,
pub anomaly: bool,
pub baseline: EmaBaselineState,
pub sample: f64,
}
fn default_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn robust_z_score(state: &EmaBaselineState, sample: f64) -> Option<f64> {
if state.sample_count < 2 {
return None;
}
let measured = state.stddev();
let floor = state.ema_mean.max(1.0).sqrt();
let effective = measured.max(floor);
if effective <= f64::EPSILON {
return None;
}
Some((sample - state.ema_mean) / effective)
}
impl Guard for BehavioralProfileGuard {
fn name(&self) -> &str {
&self.name
}
fn evaluate(&self, ctx: &GuardContext) -> Result<Verdict, KernelError> {
let now = (self.now)();
let window_start = self.current_window_start(now);
let agent_id = ctx.agent_id.as_str();
let sample = self.sample_for_window(agent_id, window_start)?;
let _ = self.observe_sample(agent_id, BehavioralMetric::CallRate, sample, window_start)?;
Ok(Verdict::Allow)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
fn guard_for_tests(feed: InMemoryReceiptFeed) -> BehavioralProfileGuard {
BehavioralProfileGuard::with_config(
Box::new(feed),
BehavioralProfileConfig {
baseline_min_windows: 2,
..Default::default()
},
)
}
#[test]
fn ema_baseline_stabilizes_under_steady_sample() {
let guard = guard_for_tests(InMemoryReceiptFeed::new());
for i in 0..20 {
let outcome = guard
.observe_sample("agent-steady", BehavioralMetric::CallRate, 10.0, i)
.unwrap();
if i >= 10 {
assert!(
(outcome.baseline.ema_mean - 10.0).abs() < 0.1,
"ema_mean should stabilize near 10 after 10 samples, got {}",
outcome.baseline.ema_mean
);
assert!(!outcome.anomaly);
}
}
}
#[test]
fn spike_fifty_x_triggers_anomaly() {
let guard = guard_for_tests(InMemoryReceiptFeed::new());
for i in 0..15 {
let _ = guard
.observe_sample("agent-spiky", BehavioralMetric::CallRate, 10.0, i)
.unwrap();
}
let outcome = guard
.observe_sample("agent-spiky", BehavioralMetric::CallRate, 500.0, 100)
.unwrap();
assert!(
outcome.anomaly,
"50x spike should flag an anomaly (z={:?})",
outcome.z_score
);
assert!(outcome.z_score.unwrap_or(0.0).abs() > DEFAULT_SIGMA_THRESHOLD);
}
#[test]
fn cold_baseline_does_not_flag() {
let guard = guard_for_tests(InMemoryReceiptFeed::new());
let outcome = guard
.observe_sample("agent-cold", BehavioralMetric::CallRate, 1_000.0, 1)
.unwrap();
assert!(
!outcome.anomaly,
"cold baseline must not flag anomalies (observed in isolation)"
);
}
}