1#![allow(dead_code)]
11
12use std::collections::{HashMap, HashSet};
13use std::time::Instant;
14
15use hmac::{Hmac, Mac};
16use sha2::{Digest, Sha256};
17
18use crate::handlers::base::{HandlerError, LambdaEnvelope, make_envelope};
19use crate::ir_nodes::IRReflex;
20
21use super::health_report::{HealthReport, level_at_least};
22
23type HmacSha256 = Hmac<Sha256>;
24
25const KNOWN_ACTIONS: &[&str] = &[
26 "drop", "revoke", "emit", "redact", "quarantine", "terminate", "alert",
27];
28
29#[derive(Debug, Clone)]
31pub struct ReflexOutcome {
32 pub reflex_name: String,
33 pub action: String,
34 pub fired: bool,
35 pub reason: String,
36 pub target_signature: String,
37 pub latency_us: f64,
38 pub envelope: LambdaEnvelope,
39 pub signed_trace: String,
40}
41
42fn sign(message: &str, secret: &[u8]) -> String {
43 let mut mac = HmacSha256::new_from_slice(secret).expect("HMAC key of any length");
44 mac.update(message.as_bytes());
45 let hex = format!("{:x}", mac.finalize().into_bytes());
46 hex[..32.min(hex.len())].to_string()
47}
48
49pub struct ReflexEngine {
52 reflexes: HashMap<String, IRReflex>,
53 fired: HashSet<(String, String)>,
54 trace_secret: Vec<u8>,
55}
56
57impl ReflexEngine {
58 pub fn new() -> Self {
59 let mut h = Sha256::new();
60 h.update(b"axon-reflex-engine-default-secret");
61 ReflexEngine::with_secret(h.finalize().to_vec())
62 }
63
64 pub fn with_secret(trace_secret: Vec<u8>) -> Self {
65 ReflexEngine {
66 reflexes: HashMap::new(),
67 fired: HashSet::new(),
68 trace_secret,
69 }
70 }
71
72 pub fn register(&mut self, reflex: IRReflex) -> Result<(), HandlerError> {
73 if !KNOWN_ACTIONS.contains(&reflex.action.as_str()) {
74 return Err(HandlerError::callee(format!(
75 "reflex '{}' declares unknown action '{}'. Engine knows: {}",
76 reflex.name,
77 reflex.action,
78 KNOWN_ACTIONS.join(", ")
79 )));
80 }
81 self.reflexes.insert(reflex.name.clone(), reflex);
82 Ok(())
83 }
84
85 pub fn dispatch(&mut self, report: &HealthReport) -> Vec<ReflexOutcome> {
88 let mut outs = Vec::new();
89 let candidates: Vec<IRReflex> = self
91 .reflexes
92 .values()
93 .filter(|r| r.trigger == report.immune_name)
94 .cloned()
95 .collect();
96 for reflex in candidates {
97 outs.push(self.maybe_fire(&reflex, report));
98 }
99 outs
100 }
101
102 pub fn clear_idempotency(&mut self) {
104 self.fired.clear();
105 }
106
107 fn maybe_fire(&mut self, reflex: &IRReflex, report: &HealthReport) -> ReflexOutcome {
108 let start = Instant::now();
109 if !level_at_least(&report.classification, &reflex.on_level) {
110 return self.noop(
111 reflex,
112 report,
113 start,
114 format!(
115 "level '{}' below threshold '{}'",
116 report.classification, reflex.on_level
117 ),
118 );
119 }
120 let key_sig = if report.anomaly_signature.is_empty() {
121 report.immune_name.clone()
122 } else {
123 report.anomaly_signature.clone()
124 };
125 let key = (reflex.name.clone(), key_sig);
126 if self.fired.contains(&key) {
127 return self.noop(
128 reflex,
129 report,
130 start,
131 "idempotent skip (already fired for this signature)".into(),
132 );
133 }
134 self.fired.insert(key);
135 let latency_us = start.elapsed().as_secs_f64() * 1e6;
138 let payload = format!(
139 "{}|{}|{}|{}|{:.6}",
140 reflex.name,
141 reflex.action,
142 report.anomaly_signature,
143 report.classification,
144 report.kl_divergence,
145 );
146 ReflexOutcome {
147 reflex_name: reflex.name.clone(),
148 action: reflex.action.clone(),
149 fired: true,
150 reason: format!(
151 "level '{}' ≥ threshold '{}'",
152 report.classification, reflex.on_level
153 ),
154 target_signature: report.anomaly_signature.clone(),
155 latency_us,
156 envelope: make_envelope(
157 report.envelope.c,
158 &format!("reflex:{}", reflex.name),
159 "observed",
160 None,
161 ),
162 signed_trace: sign(&payload, &self.trace_secret),
163 }
164 }
165
166 fn noop(
167 &self,
168 reflex: &IRReflex,
169 report: &HealthReport,
170 start: Instant,
171 reason: String,
172 ) -> ReflexOutcome {
173 let latency_us = start.elapsed().as_secs_f64() * 1e6;
174 let payload = format!(
175 "{}|NOOP|{}|{}",
176 reflex.name, report.anomaly_signature, reason
177 );
178 ReflexOutcome {
179 reflex_name: reflex.name.clone(),
180 action: reflex.action.clone(),
181 fired: false,
182 reason,
183 target_signature: report.anomaly_signature.clone(),
184 latency_us,
185 envelope: make_envelope(
186 report.envelope.c,
187 &format!("reflex:{}", reflex.name),
188 "observed",
189 None,
190 ),
191 signed_trace: sign(&payload, &self.trace_secret),
192 }
193 }
194}
195
196impl Default for ReflexEngine {
197 fn default() -> Self { Self::new() }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use super::super::health_report::make_health_report;
204 use crate::handlers::base::HandlerErrorKind;
205
206 fn mk_reflex(name: &str, trigger: &str, on_level: &str, action: &str) -> IRReflex {
207 IRReflex {
208 node_type: "reflex",
209 source_line: 1,
210 source_column: 1,
211 name: name.into(),
212 trigger: trigger.into(),
213 on_level: on_level.into(),
214 action: action.into(),
215 scope: "tenant".into(),
216 sla: "1ms".into(),
217 }
218 }
219
220 fn mk_report(immune: &str, level: &str, sig: &str) -> HealthReport {
221 let kl = match level {
222 "know" => 0.1,
223 "believe" => 0.45,
224 "speculate" => 0.75,
225 "doubt" => 0.95,
226 _ => 0.0,
227 };
228 let mut r = make_health_report(
229 immune, kl, vec!["Health".into()], sig, 300.0, "exponential", "immune:I",
230 );
231 r.classification = level.into();
233 r
234 }
235
236 #[test]
237 fn register_rejects_unknown_action() {
238 let mut eng = ReflexEngine::new();
239 let bad = mk_reflex("R", "I", "doubt", "yeet");
240 let err = eng.register(bad).unwrap_err();
241 assert_eq!(err.kind, HandlerErrorKind::Callee);
242 }
243
244 #[test]
245 fn dispatch_fires_reflex_at_or_above_threshold() {
246 let mut eng = ReflexEngine::new();
247 eng.register(mk_reflex("Stop", "I", "believe", "quarantine")).unwrap();
248 let report = mk_report("I", "speculate", "sig-1");
249 let outs = eng.dispatch(&report);
250 assert_eq!(outs.len(), 1);
251 assert!(outs[0].fired);
252 assert!(!outs[0].signed_trace.is_empty());
253 }
254
255 #[test]
256 fn dispatch_does_not_fire_below_threshold() {
257 let mut eng = ReflexEngine::new();
258 eng.register(mk_reflex("Stop", "I", "doubt", "quarantine")).unwrap();
259 let report = mk_report("I", "believe", "sig-1");
260 let outs = eng.dispatch(&report);
261 assert_eq!(outs.len(), 1);
262 assert!(!outs[0].fired);
263 assert!(outs[0].reason.contains("below threshold"));
264 }
265
266 #[test]
267 fn dispatch_is_idempotent_on_same_signature() {
268 let mut eng = ReflexEngine::new();
269 eng.register(mk_reflex("Stop", "I", "doubt", "quarantine")).unwrap();
270 let report = mk_report("I", "doubt", "sig-x");
271 let first = eng.dispatch(&report);
272 let second = eng.dispatch(&report);
273 assert!(first[0].fired);
274 assert!(!second[0].fired);
275 assert!(second[0].reason.contains("idempotent"));
276 }
277
278 #[test]
279 fn dispatch_only_triggers_on_matching_immune_name() {
280 let mut eng = ReflexEngine::new();
281 eng.register(mk_reflex("R_other", "OtherSensor", "know", "alert")).unwrap();
282 let report = mk_report("I", "doubt", "sig-m");
283 assert!(eng.dispatch(&report).is_empty());
284 }
285
286 #[test]
287 fn signed_trace_differs_per_firing_payload() {
288 let mut eng = ReflexEngine::new();
289 eng.register(mk_reflex("R", "I", "know", "alert")).unwrap();
290 let r1 = mk_report("I", "know", "sig-A");
291 let r2 = mk_report("I", "know", "sig-B");
292 let a = eng.dispatch(&r1).into_iter().next().unwrap();
293 let b = eng.dispatch(&r2).into_iter().next().unwrap();
294 assert_ne!(a.signed_trace, b.signed_trace);
295 }
296
297 #[test]
298 fn latency_is_small_and_non_negative() {
299 let mut eng = ReflexEngine::new();
300 eng.register(mk_reflex("R", "I", "know", "alert")).unwrap();
301 let report = mk_report("I", "doubt", "sig-l");
302 let out = eng.dispatch(&report).into_iter().next().unwrap();
303 assert!(out.latency_us >= 0.0);
304 assert!(out.latency_us < 5_000.0);
306 }
307}