Skip to main content

oris_intake/
prioritize.rs

1//! Deduplication, priority evaluation, and rate limiting for intake events
2
3use crate::rules::RuleEngine;
4use crate::source::IntakeEvent;
5use std::collections::HashMap;
6use std::sync::{Arc, RwLock};
7use std::time::{Duration, Instant};
8
9/// Deduplication store to track seen events
10pub struct Deduplicator {
11    /// Track seen event hashes with their timestamps
12    seen_events: Arc<RwLock<HashMap<String, Instant>>>,
13    /// Time window for deduplication (default: 24 hours)
14    window: Duration,
15    /// Maximum events to track
16    max_entries: usize,
17}
18
19impl Deduplicator {
20    /// Create a new deduplicator
21    pub fn new(window_hours: u64, max_entries: usize) -> Self {
22        Self {
23            seen_events: Arc::new(RwLock::new(HashMap::new())),
24            window: Duration::from_secs(window_hours * 3600),
25            max_entries,
26        }
27    }
28
29    /// Check if an event is a duplicate
30    pub fn is_duplicate(&self, event: &IntakeEvent) -> bool {
31        let key = self.compute_event_key(event);
32        let now = Instant::now();
33
34        let mut seen = self.seen_events.write().unwrap();
35
36        // Check if key exists and is within window
37        if let Some(ts) = seen.get(&key) {
38            if now.duration_since(*ts) < self.window {
39                return true;
40            }
41        }
42
43        // Add/update the event
44        seen.insert(key, now);
45
46        // Cleanup old entries if over limit
47        if seen.len() > self.max_entries {
48            seen.retain(|_, ts| now.duration_since(*ts) < self.window);
49        }
50
51        false
52    }
53
54    /// Compute a unique key for an event
55    fn compute_event_key(&self, event: &IntakeEvent) -> String {
56        // Combine source type, source event ID, and title for deduplication
57        format!(
58            "{}:{}:{}",
59            event.source_type,
60            event.source_event_id.as_deref().unwrap_or("none"),
61            event.title
62        )
63    }
64
65    /// Get statistics
66    pub fn stats(&self) -> DeduplicatorStats {
67        let seen = self.seen_events.read().unwrap();
68        DeduplicatorStats {
69            tracked_events: seen.len(),
70            window_hours: self.window.as_secs() / 3600,
71        }
72    }
73}
74
75impl Default for Deduplicator {
76    fn default() -> Self {
77        Self::new(24, 10000)
78    }
79}
80
81/// Statistics about the deduplicator
82#[derive(Debug, Clone)]
83pub struct DeduplicatorStats {
84    pub tracked_events: usize,
85    pub window_hours: u64,
86}
87
88/// Priority evaluator for intake events
89pub struct PriorityEvaluator {
90    /// Weights for different factors
91    weights: PriorityWeights,
92}
93
94/// Weights for priority calculation
95#[derive(Clone, Debug)]
96pub struct PriorityWeights {
97    /// Weight for severity (0-100)
98    pub severity: f32,
99    /// Weight for signal confidence
100    pub confidence: f32,
101    /// Weight for recency (recent events get higher priority)
102    pub recency: f32,
103    /// Weight for source reliability
104    pub source_reliability: f32,
105}
106
107impl Default for PriorityWeights {
108    fn default() -> Self {
109        Self {
110            severity: 0.4,
111            confidence: 0.3,
112            recency: 0.15,
113            source_reliability: 0.15,
114        }
115    }
116}
117
118impl PriorityEvaluator {
119    /// Create a new priority evaluator
120    pub fn new(weights: PriorityWeights) -> Self {
121        Self { weights }
122    }
123
124    /// Evaluate priority for an event (0-100, higher is more urgent)
125    pub fn evaluate(&self, event: &IntakeEvent, signals: &[crate::signal::ExtractedSignal]) -> i32 {
126        let severity_score = self.evaluate_severity(event);
127        let confidence_score = self.evaluate_confidence(signals);
128        let recency_score = self.evaluate_recency(event);
129        let source_score = self.evaluate_source(event);
130
131        let score = (severity_score * self.weights.severity
132            + confidence_score * self.weights.confidence
133            + recency_score * self.weights.recency
134            + source_score * self.weights.source_reliability) as i32;
135
136        score.max(0).min(100)
137    }
138
139    fn evaluate_severity(&self, event: &IntakeEvent) -> f32 {
140        match event.severity {
141            crate::source::IssueSeverity::Critical => 100.0,
142            crate::source::IssueSeverity::High => 75.0,
143            crate::source::IssueSeverity::Medium => 50.0,
144            crate::source::IssueSeverity::Low => 25.0,
145            crate::source::IssueSeverity::Info => 10.0,
146        }
147    }
148
149    fn evaluate_confidence(&self, signals: &[crate::signal::ExtractedSignal]) -> f32 {
150        if signals.is_empty() {
151            return 50.0; // Default
152        }
153
154        let avg_confidence: f32 =
155            signals.iter().map(|s| s.confidence).sum::<f32>() / signals.len() as f32;
156
157        avg_confidence * 100.0
158    }
159
160    fn evaluate_recency(&self, event: &IntakeEvent) -> f32 {
161        let age_hours = (chrono::Utc::now().timestamp_millis() - event.timestamp_ms) as f32
162            / (1000.0 * 60.0 * 60.0);
163
164        // Score decreases linearly over 7 days
165        let score = 100.0 - (age_hours * 100.0 / 7.0 / 24.0);
166        score.max(0.0).min(100.0)
167    }
168
169    fn evaluate_source(&self, event: &IntakeEvent) -> f32 {
170        // Higher reliability for verified CI/CD sources
171        match event.source_type {
172            crate::source::IntakeSourceType::Github => 90.0,
173            crate::source::IntakeSourceType::Gitlab => 90.0,
174            crate::source::IntakeSourceType::Prometheus => 85.0,
175            crate::source::IntakeSourceType::Sentry => 80.0,
176            crate::source::IntakeSourceType::LogFile => 60.0,
177            crate::source::IntakeSourceType::Http => 50.0,
178        }
179    }
180}
181
182impl Default for PriorityEvaluator {
183    fn default() -> Self {
184        Self::new(PriorityWeights::default())
185    }
186}
187
188/// Rate limiter for intake events
189pub struct RateLimiter {
190    /// Track request timestamps
191    requests: Arc<RwLock<Vec<Instant>>>,
192    /// Maximum requests per minute
193    max_per_minute: usize,
194    /// Maximum concurrent operations
195    max_concurrent: usize,
196    /// Currently active operations
197    active: Arc<RwLock<usize>>,
198    /// Backoff duration in seconds
199    backoff_seconds: u64,
200}
201
202impl RateLimiter {
203    /// Create a new rate limiter
204    pub fn new(max_per_minute: usize, max_concurrent: usize, backoff_seconds: u64) -> Self {
205        Self {
206            requests: Arc::new(RwLock::new(Vec::new())),
207            max_per_minute,
208            max_concurrent,
209            active: Arc::new(RwLock::new(0)),
210            backoff_seconds,
211        }
212    }
213
214    /// Try to acquire permission to process an event
215    /// Returns Ok(()) if allowed, Err(backoff_seconds) if rate limited
216    pub fn try_acquire(&self) -> Result<(), u64> {
217        let now = Instant::now();
218
219        // Check concurrent limit
220        {
221            let active = self.active.read().unwrap();
222            if *active >= self.max_concurrent {
223                return Err(self.backoff_seconds);
224            }
225        }
226
227        // Check rate limit
228        {
229            let mut requests = self.requests.write().unwrap();
230
231            // Remove old requests (older than 1 minute)
232            let one_minute_ago = now - Duration::from_secs(60);
233            requests.retain(|ts| *ts > one_minute_ago);
234
235            if requests.len() >= self.max_per_minute {
236                return Err(self.backoff_seconds);
237            }
238
239            requests.push(now);
240        }
241
242        // Increment active count
243        {
244            let mut active = self.active.write().unwrap();
245            *active += 1;
246        }
247
248        Ok(())
249    }
250
251    /// Release the permission
252    pub fn release(&self) {
253        let mut active = self.active.write().unwrap();
254        if *active > 0 {
255            *active -= 1;
256        }
257    }
258
259    /// Get current stats
260    pub fn stats(&self) -> RateLimiterStats {
261        let active = *self.active.read().unwrap();
262        let requests = self.requests.read().unwrap();
263        let now = Instant::now();
264        let one_minute_ago = now - Duration::from_secs(60);
265        let recent_count = requests.iter().filter(|ts| **ts > one_minute_ago).count();
266
267        RateLimiterStats {
268            active_operations: active,
269            requests_last_minute: recent_count,
270            max_per_minute: self.max_per_minute,
271            max_concurrent: self.max_concurrent,
272        }
273    }
274}
275
276impl Default for RateLimiter {
277    fn default() -> Self {
278        Self::new(60, 10, 60)
279    }
280}
281
282/// Statistics about the rate limiter
283#[derive(Debug, Clone)]
284pub struct RateLimiterStats {
285    pub active_operations: usize,
286    pub requests_last_minute: usize,
287    pub max_per_minute: usize,
288    pub max_concurrent: usize,
289}
290
291/// Auto-prioritizer combining deduplication, priority evaluation, and rate limiting
292pub struct AutoPrioritizer {
293    deduplicator: Deduplicator,
294    rule_engine: RuleEngine,
295    evaluator: PriorityEvaluator,
296    limiter: RateLimiter,
297}
298
299impl AutoPrioritizer {
300    /// Create a new auto-prioritizer
301    pub fn new(
302        deduplicator: Deduplicator,
303        rule_engine: RuleEngine,
304        evaluator: PriorityEvaluator,
305        limiter: RateLimiter,
306    ) -> Self {
307        Self {
308            deduplicator,
309            rule_engine,
310            evaluator,
311            limiter,
312        }
313    }
314
315    /// Override the rule engine used between deduplication and prioritization.
316    pub fn with_rule_engine(mut self, rule_engine: RuleEngine) -> Self {
317        self.rule_engine = rule_engine;
318        self
319    }
320
321    /// Process an event through the full prioritization pipeline
322    pub fn process(
323        &self,
324        event: &IntakeEvent,
325        signals: &[crate::signal::ExtractedSignal],
326    ) -> PrioritizationResult {
327        // Check deduplication
328        if self.deduplicator.is_duplicate(event) {
329            return PrioritizationResult::Duplicate;
330        }
331
332        let rule_result = self.rule_engine.apply(event, signals);
333        if rule_result.should_skip {
334            return PrioritizationResult::Filtered {
335                rule_ids: rule_result
336                    .applications
337                    .into_iter()
338                    .map(|application| application.rule_id)
339                    .collect(),
340            };
341        }
342
343        let event = rule_result.event;
344
345        // Check rate limit
346        if let Err(backoff) = self.limiter.try_acquire() {
347            return PrioritizationResult::RateLimited(backoff);
348        }
349
350        // Evaluate priority
351        let priority = self.evaluator.evaluate(&event, signals);
352
353        PrioritizationResult::Processed(PrioritizedEvent { event, priority })
354    }
355
356    /// Release a processed event (for rate limiting)
357    pub fn release(&self) {
358        self.limiter.release();
359    }
360
361    /// Get stats for all components
362    pub fn stats(&self) -> PrioritizerStats {
363        PrioritizerStats {
364            deduplicator: self.deduplicator.stats(),
365            rate_limiter: self.limiter.stats(),
366        }
367    }
368}
369
370impl Default for AutoPrioritizer {
371    fn default() -> Self {
372        Self::new(
373            Deduplicator::default(),
374            RuleEngine::default(),
375            PriorityEvaluator::default(),
376            RateLimiter::default(),
377        )
378    }
379}
380
381/// Result of prioritization
382#[derive(Debug)]
383pub enum PrioritizationResult {
384    /// Event was processed successfully
385    Processed(PrioritizedEvent),
386    /// Event is a duplicate
387    Duplicate,
388    /// Event was filtered out by the rule engine.
389    Filtered { rule_ids: Vec<String> },
390    /// Event is rate limited
391    RateLimited(u64),
392}
393
394/// A prioritized intake event
395#[derive(Debug, Clone)]
396pub struct PrioritizedEvent {
397    pub event: IntakeEvent,
398    pub priority: i32,
399}
400
401/// Combined stats
402#[derive(Debug)]
403pub struct PrioritizerStats {
404    pub deduplicator: DeduplicatorStats,
405    pub rate_limiter: RateLimiterStats,
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use crate::rules::{IntakeRule, RuleAction, RuleConditions};
412    use crate::source::{IntakeSourceType, IssueSeverity};
413
414    #[test]
415    fn test_deduplication() {
416        let dedup = Deduplicator::new(24, 100);
417
418        let event = IntakeEvent {
419            event_id: "test-1".to_string(),
420            source_type: IntakeSourceType::Github,
421            source_event_id: Some("run-123".to_string()),
422            title: "Build failed".to_string(),
423            description: "Test".to_string(),
424            severity: IssueSeverity::High,
425            signals: vec![],
426            raw_payload: None,
427            timestamp_ms: chrono::Utc::now().timestamp_millis(),
428        };
429
430        // First time should not be duplicate
431        assert!(!dedup.is_duplicate(&event));
432
433        // Second time should be duplicate
434        assert!(dedup.is_duplicate(&event));
435    }
436
437    #[test]
438    fn test_priority_evaluation() {
439        let evaluator = PriorityEvaluator::default();
440
441        let event = IntakeEvent {
442            event_id: "test-1".to_string(),
443            source_type: IntakeSourceType::Github,
444            source_event_id: None,
445            title: "Critical bug".to_string(),
446            description: "Test".to_string(),
447            severity: IssueSeverity::Critical,
448            signals: vec![],
449            raw_payload: None,
450            timestamp_ms: chrono::Utc::now().timestamp_millis(),
451        };
452
453        let signals = vec![crate::signal::ExtractedSignal {
454            signal_id: "sig-1".to_string(),
455            content: "test".to_string(),
456            signal_type: crate::signal::SignalType::CompilerError,
457            confidence: 0.9,
458            source: "test".to_string(),
459        }];
460
461        let priority = evaluator.evaluate(&event, &signals);
462        assert!(priority >= 50); // Should be high priority
463    }
464
465    #[test]
466    fn test_rate_limiter() {
467        let limiter = RateLimiter::new(10, 5, 1);
468
469        // Should be able to acquire up to limit
470        for _ in 0..5 {
471            assert!(limiter.try_acquire().is_ok());
472        }
473
474        // Sixth should fail due to concurrent limit
475        assert!(limiter.try_acquire().is_err());
476    }
477
478    #[test]
479    fn test_auto_prioritizer() {
480        let prioritizer = AutoPrioritizer::default();
481
482        let event = IntakeEvent {
483            event_id: "test-1".to_string(),
484            source_type: IntakeSourceType::Github,
485            source_event_id: Some("run-456".to_string()),
486            title: "Test issue".to_string(),
487            description: "Test".to_string(),
488            severity: IssueSeverity::Medium,
489            signals: vec![],
490            raw_payload: None,
491            timestamp_ms: chrono::Utc::now().timestamp_millis(),
492        };
493
494        let result = prioritizer.process(&event, &[]);
495        assert!(matches!(result, PrioritizationResult::Processed(_)));
496    }
497
498    #[test]
499    fn test_auto_prioritizer_filters_event_via_rule_engine() {
500        let prioritizer =
501            AutoPrioritizer::default().with_rule_engine(RuleEngine::with_rules(vec![IntakeRule {
502                id: "skip_http".to_string(),
503                name: "Skip http events".to_string(),
504                description: "filter low-value http events".to_string(),
505                priority: 100,
506                enabled: true,
507                conditions: RuleConditions {
508                    source_types: vec!["http".to_string()],
509                    ..Default::default()
510                },
511                actions: vec![RuleAction::Skip],
512            }]));
513
514        let event = IntakeEvent {
515            event_id: "test-filter".to_string(),
516            source_type: IntakeSourceType::Http,
517            source_event_id: Some("webhook-1".to_string()),
518            title: "Noisy webhook".to_string(),
519            description: "ignore this".to_string(),
520            severity: IssueSeverity::Low,
521            signals: vec![],
522            raw_payload: None,
523            timestamp_ms: chrono::Utc::now().timestamp_millis(),
524        };
525
526        let result = prioritizer.process(&event, &[]);
527        match result {
528            PrioritizationResult::Filtered { rule_ids } => {
529                assert_eq!(rule_ids, vec!["skip_http".to_string()]);
530            }
531            other => panic!("expected Filtered result, got {:?}", other),
532        }
533    }
534}