1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3
4use crate::anomaly::outlier::score_embedding_outlier;
5use crate::error::Result;
6use crate::model::agent_profile::AgentProfile;
7use crate::model::memory::{MemoryRecord, SourceType};
8use crate::query::MnemoEngine;
9use crate::storage::MemoryFilter;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct AnomalyCheckResult {
13 pub is_anomalous: bool,
14 pub score: f32,
15 pub reasons: Vec<String>,
16}
17
18#[derive(Debug, Clone, Default)]
34pub struct PoisoningPolicy {
35 pub outlier_threshold: Option<f32>,
39}
40
41pub const OUTLIER_SCORE_CONTRIBUTION: f32 = 0.5;
45
46impl PoisoningPolicy {
47 pub fn with_outlier_threshold(mut self, threshold: f32) -> Self {
51 self.outlier_threshold = Some(threshold);
52 self
53 }
54}
55
56#[non_exhaustive]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct QuarantineReplayEntry {
60 pub id: Uuid,
61 pub agent_id: String,
62 pub content: String,
63 pub reason: String,
64 pub created_at: String,
65 pub source_type: SourceType,
66 pub tags: Vec<String>,
67}
68
69const SELF_REFERENTIAL_INSTRUCTION_MARKERS: &[&str] = &[
74 "remember this",
75 "remember the following",
76 "in the future, always",
77 "from now on, you",
78 "from now on, always",
79 "as the system, i",
80 "as your system prompt",
81 "permanently remember",
82 "never forget that",
83 "always respond with",
84 "always answer by",
85 "whenever asked about",
86 "when you are asked",
87];
88
89fn is_trusted_source(st: SourceType) -> bool {
91 matches!(
92 st,
93 SourceType::ToolOutput
94 | SourceType::System
95 | SourceType::UserInput
96 | SourceType::Human
97 | SourceType::ModelResponse
98 )
99}
100
101fn looks_like_indirect_ingest(record: &MemoryRecord) -> bool {
104 record.tags.iter().any(|t| {
105 let lower = t.to_lowercase();
106 lower == "source:web"
107 || lower == "source:document"
108 || lower == "source:email"
109 || lower == "source:third_party"
110 || lower == "source:retrieved"
111 }) || matches!(
112 record.source_type,
113 SourceType::Retrieval | SourceType::Import
114 )
115}
116
117fn check_self_referential_injection(record: &MemoryRecord) -> Option<&'static str> {
120 let lower = record.content.to_lowercase();
121 let matched = SELF_REFERENTIAL_INSTRUCTION_MARKERS
122 .iter()
123 .find(|p| lower.contains(**p))
124 .copied()?;
125 if is_trusted_source(record.source_type) && !looks_like_indirect_ingest(record) {
129 return None;
130 }
131 Some(matched)
132}
133
134fn contains_prompt_injection_patterns(content: &str) -> bool {
139 let lower = content.to_lowercase();
140 let patterns = [
141 "ignore all previous instructions",
142 "ignore previous instructions",
143 "disregard all prior",
144 "disregard previous",
145 "override system prompt",
146 "you are now in",
147 "new instructions:",
148 "system: you are",
149 "<<sys>>",
150 "[system]",
151 "```system",
152 ];
153 patterns.iter().any(|p| lower.contains(p))
154}
155
156pub async fn check_for_anomaly(
165 engine: &MnemoEngine,
166 record: &MemoryRecord,
167) -> Result<AnomalyCheckResult> {
168 let profile = engine.storage.get_agent_profile(&record.agent_id).await?;
169
170 let mut score: f32 = 0.0;
171 let mut reasons = Vec::new();
172
173 if let Some(ref profile) = profile {
174 let importance_deviation = (record.importance as f64 - profile.avg_importance).abs();
176 if importance_deviation > 0.4 {
177 score += 0.3;
178 reasons.push(format!(
179 "importance {:.2} deviates {:.2} from agent average {:.2}",
180 record.importance, importance_deviation, profile.avg_importance
181 ));
182 }
183
184 let content_len = record.content.len() as f64;
186 if profile.avg_content_length > 0.0 {
187 let ratio = content_len / profile.avg_content_length;
188 if !(0.1..=5.0).contains(&ratio) {
189 score += 0.3;
190 reasons.push(format!(
191 "content length {} is {:.1}x agent average {:.0}",
192 record.content.len(),
193 ratio,
194 profile.avg_content_length
195 ));
196 }
197 }
198
199 if profile.total_memories > 10 {
205 if let Ok(last_updated) = chrono::DateTime::parse_from_rfc3339(&profile.last_updated)
207 && let Ok(created) = chrono::DateTime::parse_from_rfc3339(&record.created_at)
208 {
209 let seconds_since_update = (created - last_updated).num_seconds().max(1);
210 if seconds_since_update < 1 {
212 score += 0.4;
213 reasons.push("high-frequency burst detected".to_string());
214 }
215 }
216 }
217 }
218 if contains_prompt_injection_patterns(&record.content) {
222 score += 0.5;
223 reasons.push("content contains prompt injection patterns".to_string());
224 }
225
226 if let Some(marker) = check_self_referential_injection(record) {
230 score += 0.6;
231 reasons.push(format!(
232 "self-referential injection marker '{marker}' in indirectly-ingested record"
233 ));
234 }
235
236 if let Some(threshold) = engine.poisoning_policy.outlier_threshold
242 && record.embedding.is_some()
243 && let Some(baseline) = engine
244 .storage
245 .get_embedding_baseline(&record.agent_id)
246 .await?
247 {
248 let out = score_embedding_outlier(record, &baseline, threshold);
249 if out.is_outlier {
250 score += OUTLIER_SCORE_CONTRIBUTION;
251 reasons.push(format!(
252 "embedding z-score {:.2} >= threshold {:.2} (baseline n={}, {} dims >3σ)",
253 out.z_score, out.threshold, out.baseline_n, out.dims_flagged
254 ));
255 }
256 }
257
258 Ok(AnomalyCheckResult {
259 is_anomalous: score >= 0.5,
260 score,
261 reasons,
262 })
263}
264
265pub async fn replay_quarantine(
269 engine: &MnemoEngine,
270 agent_id: &str,
271 since: Option<&str>,
272) -> Result<Vec<QuarantineReplayEntry>> {
273 let filter = MemoryFilter {
274 agent_id: Some(agent_id.to_string()),
275 include_deleted: true,
278 ..Default::default()
279 };
280 let records = engine
281 .storage
282 .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
283 .await?;
284 let mut out: Vec<QuarantineReplayEntry> = records
285 .into_iter()
286 .filter(|r| r.quarantined)
287 .filter(|r| match since {
288 None => true,
289 Some(cutoff) => r.created_at.as_str() >= cutoff,
290 })
291 .map(|r| QuarantineReplayEntry {
292 id: r.id,
293 agent_id: r.agent_id,
294 content: r.content,
295 reason: r
296 .quarantine_reason
297 .unwrap_or_else(|| "unspecified".to_string()),
298 created_at: r.created_at,
299 source_type: r.source_type,
300 tags: r.tags,
301 })
302 .collect();
303 out.sort_by(|a, b| a.created_at.cmp(&b.created_at));
304 Ok(out)
305}
306
307pub async fn quarantine_memory(engine: &MnemoEngine, id: Uuid, reason: &str) -> Result<()> {
309 if let Some(mut record) = engine.storage.get_memory(id).await? {
310 record.quarantined = true;
311 record.quarantine_reason = Some(reason.to_string());
312 record.updated_at = chrono::Utc::now().to_rfc3339();
313 engine.storage.update_memory(&record).await?;
314 }
315 Ok(())
316}
317
318pub async fn update_agent_profile(engine: &MnemoEngine, record: &MemoryRecord) -> Result<()> {
320 let now = chrono::Utc::now().to_rfc3339();
321 let existing = engine.storage.get_agent_profile(&record.agent_id).await?;
322
323 let profile = match existing {
324 Some(mut p) => {
325 let n = p.total_memories as f64;
327 p.avg_importance = (p.avg_importance * n + record.importance as f64) / (n + 1.0);
328 p.avg_content_length =
329 (p.avg_content_length * n + record.content.len() as f64) / (n + 1.0);
330 p.total_memories += 1;
331 p.last_updated = now;
332 p
333 }
334 None => AgentProfile {
335 agent_id: record.agent_id.clone(),
336 avg_importance: record.importance as f64,
337 avg_content_length: record.content.len() as f64,
338 total_memories: 1,
339 last_updated: now,
340 },
341 };
342
343 engine
344 .storage
345 .insert_or_update_agent_profile(&profile)
346 .await?;
347 Ok(())
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 #[test]
355 fn test_anomaly_result_default() {
356 let result = AnomalyCheckResult {
357 is_anomalous: false,
358 score: 0.0,
359 reasons: vec![],
360 };
361 assert!(!result.is_anomalous);
362 assert_eq!(result.score, 0.0);
363 }
364}