use std::sync::Arc;
use chio_kernel::{Guard, GuardContext, KernelError, Verdict};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AdvisorySeverity {
Info,
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdvisorySignal {
pub guard_name: String,
pub description: String,
pub severity: AdvisorySeverity,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
#[serde(default)]
pub promoted: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum GuardOutput {
Deterministic {
guard_name: String,
verdict: bool,
details: Option<String>,
},
Advisory(AdvisorySignal),
}
pub trait AdvisoryGuard: Send + Sync {
fn name(&self) -> &str;
fn evaluate(&self, ctx: &GuardContext) -> Result<Vec<AdvisorySignal>, KernelError>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromotionRule {
pub guard_name: String,
pub min_severity: AdvisorySeverity,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PromotionPolicy {
pub rules: Vec<PromotionRule>,
}
impl PromotionPolicy {
pub fn new() -> Self {
Self { rules: Vec::new() }
}
pub fn add_rule(&mut self, rule: PromotionRule) {
self.rules.push(rule);
}
pub fn should_promote(&self, signal: &AdvisorySignal) -> bool {
for rule in &self.rules {
if rule.guard_name == signal.guard_name
&& severity_ord(signal.severity) >= severity_ord(rule.min_severity)
{
return true;
}
}
false
}
}
fn severity_ord(s: AdvisorySeverity) -> u8 {
match s {
AdvisorySeverity::Info => 0,
AdvisorySeverity::Low => 1,
AdvisorySeverity::Medium => 2,
AdvisorySeverity::High => 3,
AdvisorySeverity::Critical => 4,
}
}
pub struct AdvisoryPipeline {
guards: Vec<Box<dyn AdvisoryGuard>>,
policy: PromotionPolicy,
signals: std::sync::Mutex<Vec<AdvisorySignal>>,
}
impl AdvisoryPipeline {
pub fn new(policy: PromotionPolicy) -> Self {
Self {
guards: Vec::new(),
policy,
signals: std::sync::Mutex::new(Vec::new()),
}
}
pub fn add(&mut self, guard: Box<dyn AdvisoryGuard>) {
self.guards.push(guard);
}
pub fn len(&self) -> usize {
self.guards.len()
}
pub fn is_empty(&self) -> bool {
self.guards.is_empty()
}
pub fn last_signals(&self) -> Result<Vec<AdvisorySignal>, KernelError> {
let signals = self
.signals
.lock()
.map_err(|_| KernelError::Internal("advisory pipeline lock poisoned".to_string()))?;
Ok(signals.clone())
}
pub fn last_outputs(&self) -> Result<Vec<GuardOutput>, KernelError> {
let signals = self.last_signals()?;
Ok(signals.into_iter().map(GuardOutput::Advisory).collect())
}
}
impl Guard for AdvisoryPipeline {
fn name(&self) -> &str {
"advisory-pipeline"
}
fn evaluate(&self, ctx: &GuardContext) -> Result<Verdict, KernelError> {
let mut collected = Vec::new();
let mut should_deny = false;
for guard in &self.guards {
let signals = guard.evaluate(ctx)?;
for mut signal in signals {
if self.policy.should_promote(&signal) {
signal.promoted = true;
should_deny = true;
}
collected.push(signal);
}
}
let mut stored = self
.signals
.lock()
.map_err(|_| KernelError::Internal("advisory pipeline lock poisoned".to_string()))?;
*stored = collected;
if should_deny {
Ok(Verdict::Deny)
} else {
Ok(Verdict::Allow)
}
}
}
pub struct AnomalyAdvisoryGuard {
journal: Arc<chio_http_session::SessionJournal>,
invocation_threshold: u64,
depth_threshold: u32,
}
impl AnomalyAdvisoryGuard {
pub fn new(
journal: Arc<chio_http_session::SessionJournal>,
invocation_threshold: u64,
depth_threshold: u32,
) -> Self {
Self {
journal,
invocation_threshold,
depth_threshold,
}
}
}
impl AdvisoryGuard for AnomalyAdvisoryGuard {
fn name(&self) -> &str {
"anomaly-advisory"
}
fn evaluate(&self, ctx: &GuardContext) -> Result<Vec<AdvisorySignal>, KernelError> {
let mut signals = Vec::new();
let tool_counts = self
.journal
.tool_counts()
.map_err(|e| KernelError::Internal(format!("anomaly advisory journal error: {e}")))?;
if let Some(count) = tool_counts.get(&ctx.request.tool_name) {
if *count >= self.invocation_threshold {
signals.push(AdvisorySignal {
guard_name: "anomaly-advisory".to_string(),
description: format!(
"tool '{}' invoked {} times (threshold: {})",
ctx.request.tool_name, count, self.invocation_threshold
),
severity: if *count >= self.invocation_threshold * 2 {
AdvisorySeverity::High
} else {
AdvisorySeverity::Medium
},
metadata: Some(serde_json::json!({
"tool_name": ctx.request.tool_name,
"count": count,
"threshold": self.invocation_threshold,
})),
promoted: false,
});
}
}
let data_flow = self
.journal
.data_flow()
.map_err(|e| KernelError::Internal(format!("anomaly advisory journal error: {e}")))?;
if data_flow.max_delegation_depth >= self.depth_threshold {
signals.push(AdvisorySignal {
guard_name: "anomaly-advisory".to_string(),
description: format!(
"delegation depth {} exceeds threshold {}",
data_flow.max_delegation_depth, self.depth_threshold
),
severity: AdvisorySeverity::High,
metadata: Some(serde_json::json!({
"max_delegation_depth": data_flow.max_delegation_depth,
"threshold": self.depth_threshold,
})),
promoted: false,
});
}
Ok(signals)
}
}
pub struct DataTransferAdvisoryGuard {
journal: Arc<chio_http_session::SessionJournal>,
bytes_threshold: u64,
}
impl DataTransferAdvisoryGuard {
pub fn new(journal: Arc<chio_http_session::SessionJournal>, bytes_threshold: u64) -> Self {
Self {
journal,
bytes_threshold,
}
}
}
impl AdvisoryGuard for DataTransferAdvisoryGuard {
fn name(&self) -> &str {
"data-transfer-advisory"
}
fn evaluate(&self, _ctx: &GuardContext) -> Result<Vec<AdvisorySignal>, KernelError> {
let flow = self.journal.data_flow().map_err(|e| {
KernelError::Internal(format!("data-transfer advisory journal error: {e}"))
})?;
let total = flow
.total_bytes_read
.saturating_add(flow.total_bytes_written);
if total >= self.bytes_threshold {
let severity = if total >= self.bytes_threshold.saturating_mul(3) {
AdvisorySeverity::Critical
} else if total >= self.bytes_threshold.saturating_mul(2) {
AdvisorySeverity::High
} else {
AdvisorySeverity::Medium
};
Ok(vec![AdvisorySignal {
guard_name: "data-transfer-advisory".to_string(),
description: format!(
"cumulative data transfer {} bytes exceeds threshold {} bytes",
total, self.bytes_threshold
),
severity,
metadata: Some(serde_json::json!({
"total_bytes": total,
"bytes_read": flow.total_bytes_read,
"bytes_written": flow.total_bytes_written,
"threshold": self.bytes_threshold,
})),
promoted: false,
}])
} else {
Ok(vec![])
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chio_http_session::{RecordParams, SessionJournal};
fn make_journal(session_id: &str) -> Arc<SessionJournal> {
Arc::new(SessionJournal::new(session_id.to_string()))
}
fn record(journal: &SessionJournal, tool: &str, bytes_read: u64, depth: u32) {
journal
.record(RecordParams {
tool_name: tool.to_string(),
server_id: "srv".to_string(),
agent_id: "agent".to_string(),
bytes_read,
bytes_written: 0,
delegation_depth: depth,
allowed: true,
})
.expect("record");
}
fn make_ctx() -> (
chio_kernel::ToolCallRequest,
chio_core::capability::ChioScope,
String,
String,
) {
let kp = chio_core::crypto::Keypair::generate();
let scope = chio_core::capability::ChioScope::default();
let agent_id = kp.public_key().to_hex();
let server_id = "srv-test".to_string();
let cap_body = chio_core::capability::CapabilityTokenBody {
id: "cap-test".to_string(),
issuer: kp.public_key(),
subject: kp.public_key(),
scope: scope.clone(),
issued_at: 0,
expires_at: u64::MAX,
delegation_chain: vec![],
};
let cap = chio_core::capability::CapabilityToken::sign(cap_body, &kp).expect("sign cap");
let request = chio_kernel::ToolCallRequest {
request_id: "req-test".to_string(),
capability: cap,
tool_name: "read_file".to_string(),
server_id: server_id.clone(),
agent_id: agent_id.clone(),
arguments: serde_json::json!({"path": "/app/src/main.rs"}),
dpop_proof: None,
governed_intent: None,
approval_token: None,
model_metadata: None,
federated_origin_kernel_id: None,
};
(request, scope, agent_id, server_id)
}
fn guard_ctx<'a>(
request: &'a chio_kernel::ToolCallRequest,
scope: &'a chio_core::capability::ChioScope,
agent_id: &'a String,
server_id: &'a String,
) -> chio_kernel::GuardContext<'a> {
chio_kernel::GuardContext {
request,
scope,
agent_id,
server_id,
session_filesystem_roots: None,
matched_grant_index: None,
}
}
#[test]
fn advisory_signal_serde_roundtrip() {
let signal = AdvisorySignal {
guard_name: "test-guard".to_string(),
description: "test observation".to_string(),
severity: AdvisorySeverity::Medium,
metadata: Some(serde_json::json!({"key": "value"})),
promoted: false,
};
let json = serde_json::to_string(&signal).expect("serialize");
let restored: AdvisorySignal = serde_json::from_str(&json).expect("deserialize");
assert_eq!(restored.guard_name, "test-guard");
assert_eq!(restored.severity, AdvisorySeverity::Medium);
assert!(!restored.promoted);
}
#[test]
fn guard_output_distinguishes_types() {
let det = GuardOutput::Deterministic {
guard_name: "forbidden-path".to_string(),
verdict: false,
details: Some("blocked".to_string()),
};
let adv = GuardOutput::Advisory(AdvisorySignal {
guard_name: "anomaly".to_string(),
description: "unusual pattern".to_string(),
severity: AdvisorySeverity::Low,
metadata: None,
promoted: false,
});
let det_json = serde_json::to_string(&det).expect("serialize det");
let adv_json = serde_json::to_string(&adv).expect("serialize adv");
assert!(det_json.contains("\"type\":\"deterministic\""));
assert!(adv_json.contains("\"type\":\"advisory\""));
}
#[test]
fn promotion_policy_empty_never_promotes() {
let policy = PromotionPolicy::new();
let signal = AdvisorySignal {
guard_name: "test".to_string(),
description: "test".to_string(),
severity: AdvisorySeverity::Critical,
metadata: None,
promoted: false,
};
assert!(!policy.should_promote(&signal));
}
#[test]
fn promotion_policy_promotes_matching_signal() {
let mut policy = PromotionPolicy::new();
policy.add_rule(PromotionRule {
guard_name: "anomaly-advisory".to_string(),
min_severity: AdvisorySeverity::High,
});
let high_signal = AdvisorySignal {
guard_name: "anomaly-advisory".to_string(),
description: "test".to_string(),
severity: AdvisorySeverity::High,
metadata: None,
promoted: false,
};
assert!(policy.should_promote(&high_signal));
let critical_signal = AdvisorySignal {
guard_name: "anomaly-advisory".to_string(),
description: "test".to_string(),
severity: AdvisorySeverity::Critical,
metadata: None,
promoted: false,
};
assert!(policy.should_promote(&critical_signal));
}
#[test]
fn promotion_policy_does_not_promote_below_threshold() {
let mut policy = PromotionPolicy::new();
policy.add_rule(PromotionRule {
guard_name: "anomaly-advisory".to_string(),
min_severity: AdvisorySeverity::High,
});
let low_signal = AdvisorySignal {
guard_name: "anomaly-advisory".to_string(),
description: "test".to_string(),
severity: AdvisorySeverity::Medium,
metadata: None,
promoted: false,
};
assert!(!policy.should_promote(&low_signal));
}
#[test]
fn promotion_policy_does_not_promote_wrong_guard() {
let mut policy = PromotionPolicy::new();
policy.add_rule(PromotionRule {
guard_name: "anomaly-advisory".to_string(),
min_severity: AdvisorySeverity::Low,
});
let signal = AdvisorySignal {
guard_name: "other-guard".to_string(),
description: "test".to_string(),
severity: AdvisorySeverity::Critical,
metadata: None,
promoted: false,
};
assert!(!policy.should_promote(&signal));
}
struct NoOpAdvisory;
impl AdvisoryGuard for NoOpAdvisory {
fn name(&self) -> &str {
"no-op"
}
fn evaluate(&self, _ctx: &GuardContext) -> Result<Vec<AdvisorySignal>, KernelError> {
Ok(vec![])
}
}
struct AlwaysSignal {
guard_name: String,
severity: AdvisorySeverity,
}
impl AdvisoryGuard for AlwaysSignal {
fn name(&self) -> &str {
&self.guard_name
}
fn evaluate(&self, _ctx: &GuardContext) -> Result<Vec<AdvisorySignal>, KernelError> {
Ok(vec![AdvisorySignal {
guard_name: self.guard_name.clone(),
description: "always signals".to_string(),
severity: self.severity,
metadata: None,
promoted: false,
}])
}
}
#[test]
fn advisory_pipeline_allows_without_promotion() {
let mut pipeline = AdvisoryPipeline::new(PromotionPolicy::new());
pipeline.add(Box::new(AlwaysSignal {
guard_name: "test-signal".to_string(),
severity: AdvisorySeverity::High,
}));
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
let result = pipeline.evaluate(&ctx).expect("ok");
assert_eq!(result, Verdict::Allow);
let signals = pipeline.last_signals().expect("signals");
assert_eq!(signals.len(), 1);
assert!(!signals[0].promoted);
}
#[test]
fn advisory_pipeline_denies_with_promotion() {
let mut policy = PromotionPolicy::new();
policy.add_rule(PromotionRule {
guard_name: "test-signal".to_string(),
min_severity: AdvisorySeverity::High,
});
let mut pipeline = AdvisoryPipeline::new(policy);
pipeline.add(Box::new(AlwaysSignal {
guard_name: "test-signal".to_string(),
severity: AdvisorySeverity::High,
}));
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
let result = pipeline.evaluate(&ctx).expect("ok");
assert_eq!(result, Verdict::Deny);
let signals = pipeline.last_signals().expect("signals");
assert_eq!(signals.len(), 1);
assert!(signals[0].promoted);
}
#[test]
fn advisory_pipeline_no_guards_allows() {
let pipeline = AdvisoryPipeline::new(PromotionPolicy::new());
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
assert_eq!(pipeline.evaluate(&ctx).expect("ok"), Verdict::Allow);
}
#[test]
fn advisory_pipeline_collects_multiple_signals() {
let mut pipeline = AdvisoryPipeline::new(PromotionPolicy::new());
pipeline.add(Box::new(AlwaysSignal {
guard_name: "signal-a".to_string(),
severity: AdvisorySeverity::Low,
}));
pipeline.add(Box::new(AlwaysSignal {
guard_name: "signal-b".to_string(),
severity: AdvisorySeverity::Medium,
}));
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
pipeline.evaluate(&ctx).expect("ok");
let signals = pipeline.last_signals().expect("signals");
assert_eq!(signals.len(), 2);
assert_eq!(signals[0].guard_name, "signal-a");
assert_eq!(signals[1].guard_name, "signal-b");
}
#[test]
fn advisory_pipeline_guard_output_types() {
let mut pipeline = AdvisoryPipeline::new(PromotionPolicy::new());
pipeline.add(Box::new(AlwaysSignal {
guard_name: "test".to_string(),
severity: AdvisorySeverity::Info,
}));
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
pipeline.evaluate(&ctx).expect("ok");
let outputs = pipeline.last_outputs().expect("outputs");
assert_eq!(outputs.len(), 1);
assert!(matches!(outputs[0], GuardOutput::Advisory(_)));
}
#[test]
fn anomaly_advisory_no_signal_below_threshold() {
let journal = make_journal("sess-anomaly-1");
for _ in 0..4 {
record(&journal, "read_file", 100, 0);
}
let guard = AnomalyAdvisoryGuard::new(journal, 10, 5);
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
let signals = guard.evaluate(&ctx).expect("ok");
assert!(signals.is_empty());
}
#[test]
fn anomaly_advisory_signals_excessive_invocations() {
let journal = make_journal("sess-anomaly-2");
for _ in 0..10 {
record(&journal, "read_file", 100, 0);
}
let guard = AnomalyAdvisoryGuard::new(journal, 5, 10);
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
let signals = guard.evaluate(&ctx).expect("ok");
assert!(!signals.is_empty());
assert!(signals.iter().any(|s| s.description.contains("read_file")));
}
#[test]
fn anomaly_advisory_signals_deep_delegation() {
let journal = make_journal("sess-anomaly-3");
record(&journal, "read_file", 100, 8);
let guard = AnomalyAdvisoryGuard::new(journal, 100, 5);
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
let signals = guard.evaluate(&ctx).expect("ok");
assert!(!signals.is_empty());
assert!(signals
.iter()
.any(|s| s.description.contains("delegation depth")));
}
#[test]
fn data_transfer_advisory_no_signal_below_threshold() {
let journal = make_journal("sess-transfer-1");
record(&journal, "read_file", 100, 0);
let guard = DataTransferAdvisoryGuard::new(journal, 10_000);
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
let signals = guard.evaluate(&ctx).expect("ok");
assert!(signals.is_empty());
}
#[test]
fn data_transfer_advisory_signals_above_threshold() {
let journal = make_journal("sess-transfer-2");
for _ in 0..20 {
record(&journal, "read_file", 1000, 0);
}
let guard = DataTransferAdvisoryGuard::new(journal, 10_000);
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
let signals = guard.evaluate(&ctx).expect("ok");
assert_eq!(signals.len(), 1);
assert!(signals[0].description.contains("data transfer"));
}
#[test]
fn data_transfer_advisory_escalating_severity() {
let journal = make_journal("sess-transfer-3");
for _ in 0..30 {
record(&journal, "read_file", 1000, 0);
}
let guard = DataTransferAdvisoryGuard::new(journal, 10_000);
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
let signals = guard.evaluate(&ctx).expect("ok");
assert_eq!(signals.len(), 1);
assert_eq!(signals[0].severity, AdvisorySeverity::Critical);
}
#[test]
fn promoted_anomaly_denies_request() {
let journal = make_journal("sess-promote");
for _ in 0..20 {
record(&journal, "read_file", 100, 0);
}
let mut policy = PromotionPolicy::new();
policy.add_rule(PromotionRule {
guard_name: "anomaly-advisory".to_string(),
min_severity: AdvisorySeverity::Medium,
});
let mut pipeline = AdvisoryPipeline::new(policy);
pipeline.add(Box::new(AnomalyAdvisoryGuard::new(journal, 5, 10)));
let (request, scope, agent_id, server_id) = make_ctx();
let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
let result = pipeline.evaluate(&ctx).expect("ok");
assert_eq!(result, Verdict::Deny, "promoted advisory should deny");
let signals = pipeline.last_signals().expect("signals");
assert!(signals.iter().any(|s| s.promoted));
}
#[test]
fn len_and_is_empty() {
let mut pipeline = AdvisoryPipeline::new(PromotionPolicy::new());
assert!(pipeline.is_empty());
assert_eq!(pipeline.len(), 0);
pipeline.add(Box::new(NoOpAdvisory));
assert!(!pipeline.is_empty());
assert_eq!(pipeline.len(), 1);
}
#[test]
fn promotion_policy_serde_roundtrip() {
let mut policy = PromotionPolicy::new();
policy.add_rule(PromotionRule {
guard_name: "anomaly-advisory".to_string(),
min_severity: AdvisorySeverity::High,
});
let json = serde_json::to_string(&policy).expect("serialize");
let restored: PromotionPolicy = serde_json::from_str(&json).expect("deserialize");
assert_eq!(restored.rules.len(), 1);
assert_eq!(restored.rules[0].guard_name, "anomaly-advisory");
assert_eq!(restored.rules[0].min_severity, AdvisorySeverity::High);
}
}