Skip to main content

roboticus_api/
abuse.rs

1//! Abuse signal aggregation and enforcement.
2//!
3//! The `AbuseTracker` correlates rate-limit hits, policy violations, and
4//! anomalous patterns across actors, origins, and channels. It produces
5//! graduated enforcement actions (allow / slowdown / quarantine) and
6//! persists an audit trail to the `abuse_events` table.
7//!
8//! Lives in `AppState` as `Arc<RwLock<AbuseTracker>>` so both API and
9//! channel entry points share the same abuse view.
10
11use std::collections::HashMap;
12use std::time::{Duration, Instant};
13
14use tracing::{info, warn};
15
16/// Action the system should take for a given request.
17#[derive(Debug, Clone, PartialEq)]
18pub enum AbuseAction {
19    /// Request proceeds normally.
20    Allow,
21    /// Request proceeds after an artificial delay.
22    Slowdown(Duration),
23    /// Request is rejected outright.
24    Quarantine(Duration),
25}
26
27/// Categories of abuse signals fed into the tracker.
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub enum SignalType {
30    /// HTTP/channel rate limit was hit or nearly hit.
31    RateBurst,
32    /// PolicyEngine denied a tool call.
33    PolicyViolation,
34    /// Repeated identical or near-identical messages.
35    RepetitionSpam,
36    /// Rapid session creation without meaningful interaction.
37    SessionChurn,
38    /// Requests targeting admin/sensitive endpoints.
39    SensitiveProbe,
40}
41
42impl SignalType {
43    pub fn as_str(&self) -> &'static str {
44        match self {
45            Self::RateBurst => "rate_burst",
46            Self::PolicyViolation => "policy_violation",
47            Self::RepetitionSpam => "repetition_spam",
48            Self::SessionChurn => "session_churn",
49            Self::SensitiveProbe => "sensitive_probe",
50        }
51    }
52}
53
54/// Per-actor abuse signal accumulator with time-decay.
55#[derive(Debug, Clone)]
56struct ActorSignals {
57    /// (signal_type, timestamp) pairs — oldest pruned on evaluate.
58    events: Vec<(SignalType, Instant)>,
59    /// Currently active quarantine expiry, if any.
60    quarantine_until: Option<Instant>,
61    /// Currently active slowdown expiry, if any.
62    slowdown_until: Option<Instant>,
63}
64
65impl ActorSignals {
66    fn new() -> Self {
67        Self {
68            events: Vec::new(),
69            quarantine_until: None,
70            slowdown_until: None,
71        }
72    }
73
74    fn prune_stale(&mut self, window: Duration) {
75        let cutoff = Instant::now() - window;
76        self.events.retain(|(_, ts)| *ts > cutoff);
77    }
78}
79
80/// Configurable thresholds for abuse scoring.
81#[derive(Debug, Clone)]
82pub struct AbuseConfig {
83    /// Time window over which signals are aggregated.
84    pub window: Duration,
85    /// Score threshold above which slowdown is applied.
86    pub slowdown_threshold: f64,
87    /// Score threshold above which quarantine is applied.
88    pub quarantine_threshold: f64,
89    /// Duration of slowdown penalty.
90    pub slowdown_duration: Duration,
91    /// Duration of quarantine penalty.
92    pub quarantine_duration: Duration,
93    /// Maximum tracked actors (LRU eviction of oldest inactive).
94    pub max_tracked_actors: usize,
95}
96
97impl Default for AbuseConfig {
98    fn default() -> Self {
99        Self {
100            window: Duration::from_secs(300),
101            slowdown_threshold: 0.5,
102            quarantine_threshold: 0.8,
103            slowdown_duration: Duration::from_secs(5),
104            quarantine_duration: Duration::from_secs(60),
105            max_tracked_actors: 10_000,
106        }
107    }
108}
109
110/// Correlates abuse signals across actors and produces enforcement actions.
111#[derive(Debug)]
112pub struct AbuseTracker {
113    config: AbuseConfig,
114    actors: HashMap<String, ActorSignals>,
115}
116
117impl AbuseTracker {
118    pub fn new(config: AbuseConfig) -> Self {
119        Self {
120            config,
121            actors: HashMap::new(),
122        }
123    }
124
125    /// Record an abuse signal for the given actor.
126    pub fn record_signal(&mut self, actor_id: &str, signal: SignalType) {
127        let entry = self
128            .actors
129            .entry(actor_id.to_string())
130            .or_insert_with(ActorSignals::new);
131        entry.events.push((signal, Instant::now()));
132
133        // Evict oldest actors if we exceed the cap
134        if self.actors.len() > self.config.max_tracked_actors {
135            self.evict_oldest();
136        }
137    }
138
139    /// Evaluate the current abuse score for an actor and return the
140    /// appropriate enforcement action.
141    pub fn evaluate(&mut self, actor_id: &str) -> (AbuseAction, f64) {
142        let entry = match self.actors.get_mut(actor_id) {
143            Some(e) => e,
144            None => return (AbuseAction::Allow, 0.0),
145        };
146
147        // Check active quarantine
148        if let Some(until) = entry.quarantine_until {
149            if Instant::now() < until {
150                return (AbuseAction::Quarantine(until - Instant::now()), 1.0);
151            }
152            entry.quarantine_until = None;
153        }
154
155        // Check active slowdown
156        if let Some(until) = entry.slowdown_until {
157            if Instant::now() < until {
158                return (
159                    AbuseAction::Slowdown(until - Instant::now()),
160                    self.config.slowdown_threshold,
161                );
162            }
163            entry.slowdown_until = None;
164        }
165
166        // Prune stale signals and score
167        entry.prune_stale(self.config.window);
168        let score = Self::compute_score(&entry.events);
169
170        if score >= self.config.quarantine_threshold {
171            let until = Instant::now() + self.config.quarantine_duration;
172            entry.quarantine_until = Some(until);
173            warn!(actor = %actor_id, score, "abuse quarantine triggered");
174            (
175                AbuseAction::Quarantine(self.config.quarantine_duration),
176                score,
177            )
178        } else if score >= self.config.slowdown_threshold {
179            let until = Instant::now() + self.config.slowdown_duration;
180            entry.slowdown_until = Some(until);
181            info!(actor = %actor_id, score, "abuse slowdown triggered");
182            (AbuseAction::Slowdown(self.config.slowdown_duration), score)
183        } else {
184            (AbuseAction::Allow, score)
185        }
186    }
187
188    /// Compute a 0.0–1.0 abuse score from recent signals.
189    ///
190    /// Signal weights:
191    /// - RateBurst: 0.15 each
192    /// - PolicyViolation: 0.25 each
193    /// - RepetitionSpam: 0.10 each
194    /// - SessionChurn: 0.10 each
195    /// - SensitiveProbe: 0.30 each
196    ///
197    /// Score is clamped to [0.0, 1.0].
198    fn compute_score(events: &[(SignalType, Instant)]) -> f64 {
199        let mut total: f64 = 0.0;
200        for (sig, _) in events {
201            total += match sig {
202                SignalType::RateBurst => 0.15,
203                SignalType::PolicyViolation => 0.25,
204                SignalType::RepetitionSpam => 0.10,
205                SignalType::SessionChurn => 0.10,
206                SignalType::SensitiveProbe => 0.30,
207            };
208        }
209        total.min(1.0)
210    }
211
212    /// Remove the oldest inactive actor to stay within cap.
213    fn evict_oldest(&mut self) {
214        let oldest = self
215            .actors
216            .iter()
217            .filter_map(|(id, sig)| sig.events.last().map(|(_, ts)| (id.clone(), *ts)))
218            .min_by_key(|(_, ts)| *ts);
219        if let Some((id, _)) = oldest {
220            self.actors.remove(&id);
221        }
222    }
223
224    /// Operator-visible summary of tracked actors and their scores.
225    pub fn snapshot(&mut self) -> Vec<(String, f64, usize)> {
226        let window = self.config.window;
227        self.actors
228            .iter_mut()
229            .map(|(id, sig)| {
230                sig.prune_stale(window);
231                let score = Self::compute_score(&sig.events);
232                (id.clone(), score, sig.events.len())
233            })
234            .collect()
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    fn test_config() -> AbuseConfig {
243        AbuseConfig {
244            window: Duration::from_secs(60),
245            slowdown_threshold: 0.4,
246            quarantine_threshold: 0.7,
247            slowdown_duration: Duration::from_millis(100),
248            quarantine_duration: Duration::from_millis(500),
249            max_tracked_actors: 100,
250        }
251    }
252
253    #[test]
254    fn clean_actor_is_allowed() {
255        let mut tracker = AbuseTracker::new(test_config());
256        let (action, score) = tracker.evaluate("clean-user");
257        assert_eq!(action, AbuseAction::Allow);
258        assert!((score - 0.0).abs() < f64::EPSILON);
259    }
260
261    #[test]
262    fn single_signal_below_threshold() {
263        let mut tracker = AbuseTracker::new(test_config());
264        tracker.record_signal("user-1", SignalType::RateBurst);
265        let (action, score) = tracker.evaluate("user-1");
266        assert_eq!(action, AbuseAction::Allow);
267        assert!((score - 0.15).abs() < f64::EPSILON);
268    }
269
270    #[test]
271    fn accumulated_signals_trigger_slowdown() {
272        let mut tracker = AbuseTracker::new(test_config());
273        // 3 × RateBurst = 0.45 → above slowdown (0.4), below quarantine (0.7)
274        for _ in 0..3 {
275            tracker.record_signal("user-2", SignalType::RateBurst);
276        }
277        let (action, score) = tracker.evaluate("user-2");
278        assert!(matches!(action, AbuseAction::Slowdown(_)));
279        assert!((score - 0.45).abs() < f64::EPSILON);
280    }
281
282    #[test]
283    fn severe_signals_trigger_quarantine() {
284        let mut tracker = AbuseTracker::new(test_config());
285        // SensitiveProbe(0.30) + PolicyViolation(0.25) + RateBurst(0.15) = 0.70
286        tracker.record_signal("attacker", SignalType::SensitiveProbe);
287        tracker.record_signal("attacker", SignalType::PolicyViolation);
288        tracker.record_signal("attacker", SignalType::RateBurst);
289        let (action, score) = tracker.evaluate("attacker");
290        assert!(matches!(action, AbuseAction::Quarantine(_)));
291        assert!((score - 0.70).abs() < f64::EPSILON);
292    }
293
294    #[test]
295    fn score_clamps_to_one() {
296        let mut tracker = AbuseTracker::new(test_config());
297        for _ in 0..20 {
298            tracker.record_signal("spammer", SignalType::SensitiveProbe);
299        }
300        let (_, score) = tracker.evaluate("spammer");
301        assert!((score - 1.0).abs() < f64::EPSILON);
302    }
303
304    #[test]
305    fn eviction_respects_cap() {
306        let cfg = AbuseConfig {
307            max_tracked_actors: 3,
308            ..test_config()
309        };
310        let mut tracker = AbuseTracker::new(cfg);
311        for i in 0..5 {
312            tracker.record_signal(&format!("actor-{i}"), SignalType::RateBurst);
313        }
314        assert!(tracker.actors.len() <= 3);
315    }
316
317    #[test]
318    fn snapshot_returns_all_tracked() {
319        let mut tracker = AbuseTracker::new(test_config());
320        tracker.record_signal("a", SignalType::RateBurst);
321        tracker.record_signal("b", SignalType::PolicyViolation);
322        let snap = tracker.snapshot();
323        assert_eq!(snap.len(), 2);
324    }
325}