Skip to main content

chio_guards/
behavioral_profile.rs

1//! Phase 19.2 -- guard-integrated behavioral profiling.
2//!
3//! Productizes [`chio_kernel::operator_report::BehavioralFeedReport`] and
4//! the EMA helpers in `chio_kernel::operator_report` into a synchronous
5//! guard that detects anomalies against a per-agent baseline.
6//!
7//! # Model
8//!
9//! The guard tracks exponentially-weighted moving averages (EMA) for
10//! each (agent, metric) pair. The metrics are:
11//!
12//! * `call_rate`               -- receipts per window
13//! * `deny_rate`               -- fraction of denies per window
14//! * `unique_tools`            -- distinct tools per window
15//! * `avg_parameter_entropy`   -- Shannon entropy of invocation parameters
16//!
17//! When a new window's sample crosses the configured sigma threshold
18//! relative to the baseline, the guard emits a [`GuardEvidence`] entry
19//! marking the advisory signal. The verdict itself remains
20//! [`Verdict::Allow`]; this guard is advisory-only.
21//!
22//! # Storage
23//!
24//! Baselines live in memory behind a `Mutex` keyed by `(agent, metric)`.
25//! Receipts are read through a pluggable [`ReceiptFeedSource`] trait.
26//! The default in-memory implementation is used by unit tests; the
27//! production wiring backs it with an chio-store-sqlite
28//! `ReceiptStore::query_receipts` call (see the integration test in
29//! `tests/behavioral_profile.rs`).
30//!
31//! # Why synchronous
32//!
33//! The roadmap requires this to be a sync `Guard`. The feed source
34//! does one bounded read per evaluation and caches the baseline, so
35//! the cost sits well under a millisecond in typical deployments.
36
37use std::collections::HashMap;
38use std::sync::Mutex;
39use std::time::{SystemTime, UNIX_EPOCH};
40
41use chio_core::receipt::ChioReceipt;
42use chio_kernel::operator_report::EmaBaselineState;
43use chio_kernel::{Guard, GuardContext, KernelError, Verdict};
44
45/// Default EMA smoothing factor. Equivalent to a ~10-sample window.
46pub const DEFAULT_EMA_ALPHA: f64 = 0.2;
47/// Default sigma threshold above which a window is flagged.
48pub const DEFAULT_SIGMA_THRESHOLD: f64 = 2.0;
49/// Default rolling window length in seconds.
50pub const DEFAULT_WINDOW_SECS: u64 = 60;
51/// Default number of historical windows used to prime the baseline
52/// before the guard starts emitting signals. Guarantees the z-score
53/// has enough history to be meaningful.
54pub const DEFAULT_BASELINE_MIN_WINDOWS: u64 = 3;
55
56/// Metric captured per (agent, window).
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
58pub enum BehavioralMetric {
59    /// Total receipts per window.
60    CallRate,
61    /// Denies per window.
62    DenyRate,
63    /// Distinct tool names per window.
64    UniqueTools,
65    /// Approximate parameter entropy per window.
66    AvgParameterEntropy,
67}
68
69impl BehavioralMetric {
70    /// Stable string identifier for serialization / logging.
71    #[must_use]
72    pub fn as_str(&self) -> &'static str {
73        match self {
74            Self::CallRate => "call_rate",
75            Self::DenyRate => "deny_rate",
76            Self::UniqueTools => "unique_tools",
77            Self::AvgParameterEntropy => "avg_parameter_entropy",
78        }
79    }
80}
81
82/// Pluggable receipt feed. Lets the guard be tested in-memory and
83/// driven in production by `chio-store-sqlite`.
84pub trait ReceiptFeedSource: Send + Sync {
85    /// Return receipts for `agent_id` whose `timestamp` falls in
86    /// `[since, until]` (inclusive on both ends). Implementations should
87    /// return a bounded slice; callers pass short windows so the
88    /// result set stays small.
89    fn receipts_for_agent(
90        &self,
91        agent_id: &str,
92        since: u64,
93        until: u64,
94    ) -> Result<Vec<ChioReceipt>, KernelError>;
95}
96
97/// Trivial in-memory receipt feed used for tests and lightweight
98/// deployments. Stores receipts in insertion order and filters by
99/// agent + timestamp at read time.
100///
101/// The "agent" here is conceptual; real deployments resolve an agent
102/// subject through the capability snapshot table. For this feed the
103/// caller directly tags each receipt with an agent id.
104#[derive(Default)]
105pub struct InMemoryReceiptFeed {
106    inner: Mutex<Vec<(String, ChioReceipt)>>,
107}
108
109impl InMemoryReceiptFeed {
110    /// Build a new, empty in-memory feed.
111    #[must_use]
112    pub fn new() -> Self {
113        Self::default()
114    }
115
116    /// Append a receipt tagged with the given agent id.
117    pub fn push(&self, agent_id: &str, receipt: ChioReceipt) -> Result<(), KernelError> {
118        let mut inner = self
119            .inner
120            .lock()
121            .map_err(|_| KernelError::Internal("behavioral feed lock poisoned".to_string()))?;
122        inner.push((agent_id.to_string(), receipt));
123        Ok(())
124    }
125
126    /// Number of receipts stored.
127    pub fn len(&self) -> Result<usize, KernelError> {
128        let inner = self
129            .inner
130            .lock()
131            .map_err(|_| KernelError::Internal("behavioral feed lock poisoned".to_string()))?;
132        Ok(inner.len())
133    }
134
135    /// Whether the feed is empty.
136    pub fn is_empty(&self) -> Result<bool, KernelError> {
137        Ok(self.len()? == 0)
138    }
139}
140
141impl ReceiptFeedSource for InMemoryReceiptFeed {
142    fn receipts_for_agent(
143        &self,
144        agent_id: &str,
145        since: u64,
146        until: u64,
147    ) -> Result<Vec<ChioReceipt>, KernelError> {
148        let inner = self
149            .inner
150            .lock()
151            .map_err(|_| KernelError::Internal("behavioral feed lock poisoned".to_string()))?;
152        Ok(inner
153            .iter()
154            .filter(|(id, r)| id == agent_id && r.timestamp >= since && r.timestamp <= until)
155            .map(|(_, r)| r.clone())
156            .collect())
157    }
158}
159
160/// Configuration surface for [`BehavioralProfileGuard`].
161#[derive(Debug, Clone, Copy)]
162pub struct BehavioralProfileConfig {
163    /// EMA smoothing factor.
164    pub ema_alpha: f64,
165    /// Sigma threshold above which a window is flagged.
166    pub sigma_threshold: f64,
167    /// Rolling window length in seconds.
168    pub window_secs: u64,
169    /// Minimum number of windows required before anomalies can be
170    /// flagged. Protects the guard from firing on a cold baseline.
171    pub baseline_min_windows: u64,
172}
173
174impl Default for BehavioralProfileConfig {
175    fn default() -> Self {
176        Self {
177            ema_alpha: DEFAULT_EMA_ALPHA,
178            sigma_threshold: DEFAULT_SIGMA_THRESHOLD,
179            window_secs: DEFAULT_WINDOW_SECS,
180            baseline_min_windows: DEFAULT_BASELINE_MIN_WINDOWS,
181        }
182    }
183}
184
185/// Baseline entry keyed by (agent, metric).
186#[derive(Debug, Clone, Default)]
187struct BaselineEntry {
188    state: EmaBaselineState,
189    last_window_start: u64,
190}
191
192/// Guard that computes behavioral-anomaly signals from the receipt
193/// store and emits advisories without blocking the request.
194pub struct BehavioralProfileGuard {
195    name: String,
196    config: BehavioralProfileConfig,
197    feed: Box<dyn ReceiptFeedSource>,
198    // Keyed by (agent_id, metric).
199    baselines: Mutex<HashMap<(String, BehavioralMetric), BaselineEntry>>,
200    now: Box<dyn Fn() -> u64 + Send + Sync>,
201}
202
203impl BehavioralProfileGuard {
204    /// Construct a new guard with the default configuration and a
205    /// system-clock `now` source.
206    pub fn new(feed: Box<dyn ReceiptFeedSource>) -> Self {
207        Self::with_config(feed, BehavioralProfileConfig::default())
208    }
209
210    /// Construct with an explicit config.
211    pub fn with_config(feed: Box<dyn ReceiptFeedSource>, config: BehavioralProfileConfig) -> Self {
212        Self {
213            name: "behavioral-profile".to_string(),
214            config,
215            feed,
216            baselines: Mutex::new(HashMap::new()),
217            now: Box::new(default_now),
218        }
219    }
220
221    /// Override the clock source. Useful for deterministic tests.
222    pub fn with_clock(mut self, clock: Box<dyn Fn() -> u64 + Send + Sync>) -> Self {
223        self.now = clock;
224        self
225    }
226
227    /// Feed the guard a fresh sample and return whether the window
228    /// should be flagged as anomalous. Exposed for tests and dashboards
229    /// that want to surface scores without running the full pipeline.
230    pub fn observe_sample(
231        &self,
232        agent_id: &str,
233        metric: BehavioralMetric,
234        sample: f64,
235        window_start: u64,
236    ) -> Result<ObservationOutcome, KernelError> {
237        let mut baselines = self
238            .baselines
239            .lock()
240            .map_err(|_| KernelError::Internal("baseline lock poisoned".to_string()))?;
241        let entry = baselines.entry((agent_id.to_string(), metric)).or_default();
242
243        // Only record one sample per window-start pair. Callers that
244        // pass the same window_start multiple times get the same
245        // verdict without inflating the sample count.
246        if entry.last_window_start == window_start && entry.state.sample_count > 0 {
247            let z = robust_z_score(&entry.state, sample);
248            let anomaly = z
249                .map(|z| z.abs() > self.config.sigma_threshold)
250                .unwrap_or(false);
251            return Ok(ObservationOutcome {
252                z_score: z,
253                anomaly,
254                baseline: entry.state.clone(),
255                sample,
256            });
257        }
258
259        let z = robust_z_score(&entry.state, sample);
260        let seen_enough = entry.state.sample_count >= self.config.baseline_min_windows;
261        let anomaly = seen_enough
262            && z.map(|z| z.abs() > self.config.sigma_threshold)
263                .unwrap_or(false);
264
265        entry
266            .state
267            .update(sample, self.config.ema_alpha, window_start);
268        entry.last_window_start = window_start;
269        let baseline = entry.state.clone();
270
271        Ok(ObservationOutcome {
272            z_score: z,
273            anomaly,
274            baseline,
275            sample,
276        })
277    }
278
279    /// Access the snapshot of a (agent, metric) baseline.
280    pub fn baseline(
281        &self,
282        agent_id: &str,
283        metric: BehavioralMetric,
284    ) -> Result<Option<EmaBaselineState>, KernelError> {
285        let baselines = self
286            .baselines
287            .lock()
288            .map_err(|_| KernelError::Internal("baseline lock poisoned".to_string()))?;
289        Ok(baselines
290            .get(&(agent_id.to_string(), metric))
291            .map(|e| e.state.clone()))
292    }
293
294    fn current_window_start(&self, now: u64) -> u64 {
295        let window = self.config.window_secs.max(1);
296        (now / window) * window
297    }
298
299    fn sample_for_window(&self, agent_id: &str, window_start: u64) -> Result<f64, KernelError> {
300        let window_end = window_start + self.config.window_secs.max(1);
301        let receipts =
302            self.feed
303                .receipts_for_agent(agent_id, window_start, window_end.saturating_sub(1))?;
304        Ok(receipts.len() as f64)
305    }
306}
307
308/// Outcome of a single `observe_sample` call.
309#[derive(Debug, Clone)]
310pub struct ObservationOutcome {
311    /// Z-score of the new sample relative to the pre-update baseline.
312    /// `None` when the baseline was too small.
313    pub z_score: Option<f64>,
314    /// Whether the sample was flagged as anomalous.
315    pub anomaly: bool,
316    /// Post-update baseline snapshot.
317    pub baseline: EmaBaselineState,
318    /// The sample value that was observed.
319    pub sample: f64,
320}
321
322fn default_now() -> u64 {
323    SystemTime::now()
324        .duration_since(UNIX_EPOCH)
325        .map(|d| d.as_secs())
326        .unwrap_or(0)
327}
328
329/// Z-score with a Poisson-style stddev floor.
330///
331/// For count metrics (call rate, deny count, unique tools) a zero
332/// measured variance is an artifact of a short baseline rather than a
333/// true zero-noise process. We floor the effective stddev at
334/// `sqrt(max(mean, 1))` so that a 50x spike over a steady 10/window
335/// baseline is detected as an anomaly even when the EWMA variance
336/// happens to be numerically zero.
337fn robust_z_score(state: &EmaBaselineState, sample: f64) -> Option<f64> {
338    if state.sample_count < 2 {
339        return None;
340    }
341    let measured = state.stddev();
342    let floor = state.ema_mean.max(1.0).sqrt();
343    let effective = measured.max(floor);
344    if effective <= f64::EPSILON {
345        return None;
346    }
347    Some((sample - state.ema_mean) / effective)
348}
349
350impl Guard for BehavioralProfileGuard {
351    fn name(&self) -> &str {
352        &self.name
353    }
354
355    fn evaluate(&self, ctx: &GuardContext) -> Result<Verdict, KernelError> {
356        let now = (self.now)();
357        let window_start = self.current_window_start(now);
358        let agent_id = ctx.agent_id.as_str();
359        let sample = self.sample_for_window(agent_id, window_start)?;
360        // Advisory-only guard: we only inspect the call-rate metric in
361        // the sync path. Other metrics are available through
362        // `observe_sample` so callers can feed the guard out-of-band.
363        let _ = self.observe_sample(agent_id, BehavioralMetric::CallRate, sample, window_start)?;
364        Ok(Verdict::Allow)
365    }
366}
367
368#[cfg(test)]
369#[allow(clippy::unwrap_used, clippy::expect_used)]
370mod tests {
371    use super::*;
372
373    fn guard_for_tests(feed: InMemoryReceiptFeed) -> BehavioralProfileGuard {
374        BehavioralProfileGuard::with_config(
375            Box::new(feed),
376            BehavioralProfileConfig {
377                baseline_min_windows: 2,
378                ..Default::default()
379            },
380        )
381    }
382
383    #[test]
384    fn ema_baseline_stabilizes_under_steady_sample() {
385        let guard = guard_for_tests(InMemoryReceiptFeed::new());
386        for i in 0..20 {
387            let outcome = guard
388                .observe_sample("agent-steady", BehavioralMetric::CallRate, 10.0, i)
389                .unwrap();
390            // After enough samples the baseline centers on 10.
391            if i >= 10 {
392                assert!(
393                    (outcome.baseline.ema_mean - 10.0).abs() < 0.1,
394                    "ema_mean should stabilize near 10 after 10 samples, got {}",
395                    outcome.baseline.ema_mean
396                );
397                assert!(!outcome.anomaly);
398            }
399        }
400    }
401
402    #[test]
403    fn spike_fifty_x_triggers_anomaly() {
404        let guard = guard_for_tests(InMemoryReceiptFeed::new());
405        // Prime: 10 calls per window for a long enough stretch.
406        for i in 0..15 {
407            let _ = guard
408                .observe_sample("agent-spiky", BehavioralMetric::CallRate, 10.0, i)
409                .unwrap();
410        }
411        // Spike: 50x the baseline in the next window.
412        let outcome = guard
413            .observe_sample("agent-spiky", BehavioralMetric::CallRate, 500.0, 100)
414            .unwrap();
415        assert!(
416            outcome.anomaly,
417            "50x spike should flag an anomaly (z={:?})",
418            outcome.z_score
419        );
420        assert!(outcome.z_score.unwrap_or(0.0).abs() > DEFAULT_SIGMA_THRESHOLD);
421    }
422
423    #[test]
424    fn cold_baseline_does_not_flag() {
425        let guard = guard_for_tests(InMemoryReceiptFeed::new());
426        let outcome = guard
427            .observe_sample("agent-cold", BehavioralMetric::CallRate, 1_000.0, 1)
428            .unwrap();
429        assert!(
430            !outcome.anomaly,
431            "cold baseline must not flag anomalies (observed in isolation)"
432        );
433    }
434}