use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::anomaly::outlier::score_embedding_outlier;
use crate::error::Result;
use crate::model::agent_profile::AgentProfile;
use crate::model::memory::{MemoryRecord, SourceType};
use crate::query::MnemoEngine;
use crate::storage::MemoryFilter;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnomalyCheckResult {
pub is_anomalous: bool,
pub score: f32,
pub reasons: Vec<String>,
}
#[derive(Debug, Clone, Default)]
pub struct PoisoningPolicy {
pub outlier_threshold: Option<f32>,
}
pub const OUTLIER_SCORE_CONTRIBUTION: f32 = 0.5;
impl PoisoningPolicy {
pub fn with_outlier_threshold(mut self, threshold: f32) -> Self {
self.outlier_threshold = Some(threshold);
self
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuarantineReplayEntry {
pub id: Uuid,
pub agent_id: String,
pub content: String,
pub reason: String,
pub created_at: String,
pub source_type: SourceType,
pub tags: Vec<String>,
}
const SELF_REFERENTIAL_INSTRUCTION_MARKERS: &[&str] = &[
"remember this",
"remember the following",
"in the future, always",
"from now on, you",
"from now on, always",
"as the system, i",
"as your system prompt",
"permanently remember",
"never forget that",
"always respond with",
"always answer by",
"whenever asked about",
"when you are asked",
];
fn is_trusted_source(st: SourceType) -> bool {
matches!(
st,
SourceType::ToolOutput
| SourceType::System
| SourceType::UserInput
| SourceType::Human
| SourceType::ModelResponse
)
}
fn looks_like_indirect_ingest(record: &MemoryRecord) -> bool {
record.tags.iter().any(|t| {
let lower = t.to_lowercase();
lower == "source:web"
|| lower == "source:document"
|| lower == "source:email"
|| lower == "source:third_party"
|| lower == "source:retrieved"
}) || matches!(
record.source_type,
SourceType::Retrieval | SourceType::Import
)
}
fn check_self_referential_injection(record: &MemoryRecord) -> Option<&'static str> {
let lower = record.content.to_lowercase();
let matched = SELF_REFERENTIAL_INSTRUCTION_MARKERS
.iter()
.find(|p| lower.contains(**p))
.copied()?;
if is_trusted_source(record.source_type) && !looks_like_indirect_ingest(record) {
return None;
}
Some(matched)
}
fn contains_prompt_injection_patterns(content: &str) -> bool {
let lower = content.to_lowercase();
let patterns = [
"ignore all previous instructions",
"ignore previous instructions",
"disregard all prior",
"disregard previous",
"override system prompt",
"you are now in",
"new instructions:",
"system: you are",
"<<sys>>",
"[system]",
"```system",
];
patterns.iter().any(|p| lower.contains(p))
}
pub async fn check_for_anomaly(
engine: &MnemoEngine,
record: &MemoryRecord,
) -> Result<AnomalyCheckResult> {
let profile = engine.storage.get_agent_profile(&record.agent_id).await?;
let mut score: f32 = 0.0;
let mut reasons = Vec::new();
if let Some(ref profile) = profile {
let importance_deviation = (record.importance as f64 - profile.avg_importance).abs();
if importance_deviation > 0.4 {
score += 0.3;
reasons.push(format!(
"importance {:.2} deviates {:.2} from agent average {:.2}",
record.importance, importance_deviation, profile.avg_importance
));
}
let content_len = record.content.len() as f64;
if profile.avg_content_length > 0.0 {
let ratio = content_len / profile.avg_content_length;
if !(0.1..=5.0).contains(&ratio) {
score += 0.3;
reasons.push(format!(
"content length {} is {:.1}x agent average {:.0}",
record.content.len(),
ratio,
profile.avg_content_length
));
}
}
if profile.total_memories > 10 {
if let Ok(last_updated) = chrono::DateTime::parse_from_rfc3339(&profile.last_updated)
&& let Ok(created) = chrono::DateTime::parse_from_rfc3339(&record.created_at)
{
let seconds_since_update = (created - last_updated).num_seconds().max(1);
if seconds_since_update < 1 {
score += 0.4;
reasons.push("high-frequency burst detected".to_string());
}
}
}
}
if contains_prompt_injection_patterns(&record.content) {
score += 0.5;
reasons.push("content contains prompt injection patterns".to_string());
}
if let Some(marker) = check_self_referential_injection(record) {
score += 0.6;
reasons.push(format!(
"self-referential injection marker '{marker}' in indirectly-ingested record"
));
}
if let Some(threshold) = engine.poisoning_policy.outlier_threshold
&& record.embedding.is_some()
&& let Some(baseline) = engine
.storage
.get_embedding_baseline(&record.agent_id)
.await?
{
let out = score_embedding_outlier(record, &baseline, threshold);
if out.is_outlier {
score += OUTLIER_SCORE_CONTRIBUTION;
reasons.push(format!(
"embedding z-score {:.2} >= threshold {:.2} (baseline n={}, {} dims >3σ)",
out.z_score, out.threshold, out.baseline_n, out.dims_flagged
));
}
}
Ok(AnomalyCheckResult {
is_anomalous: score >= 0.5,
score,
reasons,
})
}
pub async fn replay_quarantine(
engine: &MnemoEngine,
agent_id: &str,
since: Option<&str>,
) -> Result<Vec<QuarantineReplayEntry>> {
let filter = MemoryFilter {
agent_id: Some(agent_id.to_string()),
include_deleted: true,
..Default::default()
};
let records = engine
.storage
.list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
.await?;
let mut out: Vec<QuarantineReplayEntry> = records
.into_iter()
.filter(|r| r.quarantined)
.filter(|r| match since {
None => true,
Some(cutoff) => r.created_at.as_str() >= cutoff,
})
.map(|r| QuarantineReplayEntry {
id: r.id,
agent_id: r.agent_id,
content: r.content,
reason: r
.quarantine_reason
.unwrap_or_else(|| "unspecified".to_string()),
created_at: r.created_at,
source_type: r.source_type,
tags: r.tags,
})
.collect();
out.sort_by(|a, b| a.created_at.cmp(&b.created_at));
Ok(out)
}
pub async fn quarantine_memory(engine: &MnemoEngine, id: Uuid, reason: &str) -> Result<()> {
if let Some(mut record) = engine.storage.get_memory(id).await? {
record.quarantined = true;
record.quarantine_reason = Some(reason.to_string());
record.updated_at = chrono::Utc::now().to_rfc3339();
engine.storage.update_memory(&record).await?;
}
Ok(())
}
pub async fn update_agent_profile(engine: &MnemoEngine, record: &MemoryRecord) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
let existing = engine.storage.get_agent_profile(&record.agent_id).await?;
let profile = match existing {
Some(mut p) => {
let n = p.total_memories as f64;
p.avg_importance = (p.avg_importance * n + record.importance as f64) / (n + 1.0);
p.avg_content_length =
(p.avg_content_length * n + record.content.len() as f64) / (n + 1.0);
p.total_memories += 1;
p.last_updated = now;
p
}
None => AgentProfile {
agent_id: record.agent_id.clone(),
avg_importance: record.importance as f64,
avg_content_length: record.content.len() as f64,
total_memories: 1,
last_updated: now,
},
};
engine
.storage
.insert_or_update_agent_profile(&profile)
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_anomaly_result_default() {
let result = AnomalyCheckResult {
is_anomalous: false,
score: 0.0,
reasons: vec![],
};
assert!(!result.is_anomalous);
assert_eq!(result.score, 0.0);
}
}