1use crate::rules::RuleEngine;
4use crate::source::IntakeEvent;
5use std::collections::HashMap;
6use std::sync::{Arc, RwLock};
7use std::time::{Duration, Instant};
8
9pub struct Deduplicator {
11 seen_events: Arc<RwLock<HashMap<String, Instant>>>,
13 window: Duration,
15 max_entries: usize,
17}
18
19impl Deduplicator {
20 pub fn new(window_hours: u64, max_entries: usize) -> Self {
22 Self {
23 seen_events: Arc::new(RwLock::new(HashMap::new())),
24 window: Duration::from_secs(window_hours * 3600),
25 max_entries,
26 }
27 }
28
29 pub fn is_duplicate(&self, event: &IntakeEvent) -> bool {
31 let key = self.compute_event_key(event);
32 let now = Instant::now();
33
34 let mut seen = self.seen_events.write().unwrap();
35
36 if let Some(ts) = seen.get(&key) {
38 if now.duration_since(*ts) < self.window {
39 return true;
40 }
41 }
42
43 seen.insert(key, now);
45
46 if seen.len() > self.max_entries {
48 seen.retain(|_, ts| now.duration_since(*ts) < self.window);
49 }
50
51 false
52 }
53
54 fn compute_event_key(&self, event: &IntakeEvent) -> String {
56 format!(
58 "{}:{}:{}",
59 event.source_type,
60 event.source_event_id.as_deref().unwrap_or("none"),
61 event.title
62 )
63 }
64
65 pub fn stats(&self) -> DeduplicatorStats {
67 let seen = self.seen_events.read().unwrap();
68 DeduplicatorStats {
69 tracked_events: seen.len(),
70 window_hours: self.window.as_secs() / 3600,
71 }
72 }
73}
74
75impl Default for Deduplicator {
76 fn default() -> Self {
77 Self::new(24, 10000)
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct DeduplicatorStats {
84 pub tracked_events: usize,
85 pub window_hours: u64,
86}
87
88pub struct PriorityEvaluator {
90 weights: PriorityWeights,
92}
93
94#[derive(Clone, Debug)]
96pub struct PriorityWeights {
97 pub severity: f32,
99 pub confidence: f32,
101 pub recency: f32,
103 pub source_reliability: f32,
105}
106
107impl Default for PriorityWeights {
108 fn default() -> Self {
109 Self {
110 severity: 0.4,
111 confidence: 0.3,
112 recency: 0.15,
113 source_reliability: 0.15,
114 }
115 }
116}
117
118impl PriorityEvaluator {
119 pub fn new(weights: PriorityWeights) -> Self {
121 Self { weights }
122 }
123
124 pub fn evaluate(&self, event: &IntakeEvent, signals: &[crate::signal::ExtractedSignal]) -> i32 {
126 let severity_score = self.evaluate_severity(event);
127 let confidence_score = self.evaluate_confidence(signals);
128 let recency_score = self.evaluate_recency(event);
129 let source_score = self.evaluate_source(event);
130
131 let score = (severity_score * self.weights.severity
132 + confidence_score * self.weights.confidence
133 + recency_score * self.weights.recency
134 + source_score * self.weights.source_reliability) as i32;
135
136 score.max(0).min(100)
137 }
138
139 fn evaluate_severity(&self, event: &IntakeEvent) -> f32 {
140 match event.severity {
141 crate::source::IssueSeverity::Critical => 100.0,
142 crate::source::IssueSeverity::High => 75.0,
143 crate::source::IssueSeverity::Medium => 50.0,
144 crate::source::IssueSeverity::Low => 25.0,
145 crate::source::IssueSeverity::Info => 10.0,
146 }
147 }
148
149 fn evaluate_confidence(&self, signals: &[crate::signal::ExtractedSignal]) -> f32 {
150 if signals.is_empty() {
151 return 50.0; }
153
154 let avg_confidence: f32 =
155 signals.iter().map(|s| s.confidence).sum::<f32>() / signals.len() as f32;
156
157 avg_confidence * 100.0
158 }
159
160 fn evaluate_recency(&self, event: &IntakeEvent) -> f32 {
161 let age_hours = (chrono::Utc::now().timestamp_millis() - event.timestamp_ms) as f32
162 / (1000.0 * 60.0 * 60.0);
163
164 let score = 100.0 - (age_hours * 100.0 / 7.0 / 24.0);
166 score.max(0.0).min(100.0)
167 }
168
169 fn evaluate_source(&self, event: &IntakeEvent) -> f32 {
170 match event.source_type {
172 crate::source::IntakeSourceType::Github => 90.0,
173 crate::source::IntakeSourceType::Gitlab => 90.0,
174 crate::source::IntakeSourceType::Prometheus => 85.0,
175 crate::source::IntakeSourceType::Sentry => 80.0,
176 crate::source::IntakeSourceType::LogFile => 60.0,
177 crate::source::IntakeSourceType::Http => 50.0,
178 }
179 }
180}
181
182impl Default for PriorityEvaluator {
183 fn default() -> Self {
184 Self::new(PriorityWeights::default())
185 }
186}
187
188pub struct RateLimiter {
190 requests: Arc<RwLock<Vec<Instant>>>,
192 max_per_minute: usize,
194 max_concurrent: usize,
196 active: Arc<RwLock<usize>>,
198 backoff_seconds: u64,
200}
201
202impl RateLimiter {
203 pub fn new(max_per_minute: usize, max_concurrent: usize, backoff_seconds: u64) -> Self {
205 Self {
206 requests: Arc::new(RwLock::new(Vec::new())),
207 max_per_minute,
208 max_concurrent,
209 active: Arc::new(RwLock::new(0)),
210 backoff_seconds,
211 }
212 }
213
214 pub fn try_acquire(&self) -> Result<(), u64> {
217 let now = Instant::now();
218
219 {
221 let active = self.active.read().unwrap();
222 if *active >= self.max_concurrent {
223 return Err(self.backoff_seconds);
224 }
225 }
226
227 {
229 let mut requests = self.requests.write().unwrap();
230
231 let one_minute_ago = now - Duration::from_secs(60);
233 requests.retain(|ts| *ts > one_minute_ago);
234
235 if requests.len() >= self.max_per_minute {
236 return Err(self.backoff_seconds);
237 }
238
239 requests.push(now);
240 }
241
242 {
244 let mut active = self.active.write().unwrap();
245 *active += 1;
246 }
247
248 Ok(())
249 }
250
251 pub fn release(&self) {
253 let mut active = self.active.write().unwrap();
254 if *active > 0 {
255 *active -= 1;
256 }
257 }
258
259 pub fn stats(&self) -> RateLimiterStats {
261 let active = *self.active.read().unwrap();
262 let requests = self.requests.read().unwrap();
263 let now = Instant::now();
264 let one_minute_ago = now - Duration::from_secs(60);
265 let recent_count = requests.iter().filter(|ts| **ts > one_minute_ago).count();
266
267 RateLimiterStats {
268 active_operations: active,
269 requests_last_minute: recent_count,
270 max_per_minute: self.max_per_minute,
271 max_concurrent: self.max_concurrent,
272 }
273 }
274}
275
276impl Default for RateLimiter {
277 fn default() -> Self {
278 Self::new(60, 10, 60)
279 }
280}
281
282#[derive(Debug, Clone)]
284pub struct RateLimiterStats {
285 pub active_operations: usize,
286 pub requests_last_minute: usize,
287 pub max_per_minute: usize,
288 pub max_concurrent: usize,
289}
290
291pub struct AutoPrioritizer {
293 deduplicator: Deduplicator,
294 rule_engine: RuleEngine,
295 evaluator: PriorityEvaluator,
296 limiter: RateLimiter,
297}
298
299impl AutoPrioritizer {
300 pub fn new(
302 deduplicator: Deduplicator,
303 rule_engine: RuleEngine,
304 evaluator: PriorityEvaluator,
305 limiter: RateLimiter,
306 ) -> Self {
307 Self {
308 deduplicator,
309 rule_engine,
310 evaluator,
311 limiter,
312 }
313 }
314
315 pub fn with_rule_engine(mut self, rule_engine: RuleEngine) -> Self {
317 self.rule_engine = rule_engine;
318 self
319 }
320
321 pub fn process(
323 &self,
324 event: &IntakeEvent,
325 signals: &[crate::signal::ExtractedSignal],
326 ) -> PrioritizationResult {
327 if self.deduplicator.is_duplicate(event) {
329 return PrioritizationResult::Duplicate;
330 }
331
332 let rule_result = self.rule_engine.apply(event, signals);
333 if rule_result.should_skip {
334 return PrioritizationResult::Filtered {
335 rule_ids: rule_result
336 .applications
337 .into_iter()
338 .map(|application| application.rule_id)
339 .collect(),
340 };
341 }
342
343 let event = rule_result.event;
344
345 if let Err(backoff) = self.limiter.try_acquire() {
347 return PrioritizationResult::RateLimited(backoff);
348 }
349
350 let priority = self.evaluator.evaluate(&event, signals);
352
353 PrioritizationResult::Processed(PrioritizedEvent { event, priority })
354 }
355
356 pub fn release(&self) {
358 self.limiter.release();
359 }
360
361 pub fn stats(&self) -> PrioritizerStats {
363 PrioritizerStats {
364 deduplicator: self.deduplicator.stats(),
365 rate_limiter: self.limiter.stats(),
366 }
367 }
368}
369
370impl Default for AutoPrioritizer {
371 fn default() -> Self {
372 Self::new(
373 Deduplicator::default(),
374 RuleEngine::default(),
375 PriorityEvaluator::default(),
376 RateLimiter::default(),
377 )
378 }
379}
380
381#[derive(Debug)]
383pub enum PrioritizationResult {
384 Processed(PrioritizedEvent),
386 Duplicate,
388 Filtered { rule_ids: Vec<String> },
390 RateLimited(u64),
392}
393
394#[derive(Debug, Clone)]
396pub struct PrioritizedEvent {
397 pub event: IntakeEvent,
398 pub priority: i32,
399}
400
401#[derive(Debug)]
403pub struct PrioritizerStats {
404 pub deduplicator: DeduplicatorStats,
405 pub rate_limiter: RateLimiterStats,
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use crate::rules::{IntakeRule, RuleAction, RuleConditions};
412 use crate::source::{IntakeSourceType, IssueSeverity};
413
414 #[test]
415 fn test_deduplication() {
416 let dedup = Deduplicator::new(24, 100);
417
418 let event = IntakeEvent {
419 event_id: "test-1".to_string(),
420 source_type: IntakeSourceType::Github,
421 source_event_id: Some("run-123".to_string()),
422 title: "Build failed".to_string(),
423 description: "Test".to_string(),
424 severity: IssueSeverity::High,
425 signals: vec![],
426 raw_payload: None,
427 timestamp_ms: chrono::Utc::now().timestamp_millis(),
428 };
429
430 assert!(!dedup.is_duplicate(&event));
432
433 assert!(dedup.is_duplicate(&event));
435 }
436
437 #[test]
438 fn test_priority_evaluation() {
439 let evaluator = PriorityEvaluator::default();
440
441 let event = IntakeEvent {
442 event_id: "test-1".to_string(),
443 source_type: IntakeSourceType::Github,
444 source_event_id: None,
445 title: "Critical bug".to_string(),
446 description: "Test".to_string(),
447 severity: IssueSeverity::Critical,
448 signals: vec![],
449 raw_payload: None,
450 timestamp_ms: chrono::Utc::now().timestamp_millis(),
451 };
452
453 let signals = vec![crate::signal::ExtractedSignal {
454 signal_id: "sig-1".to_string(),
455 content: "test".to_string(),
456 signal_type: crate::signal::SignalType::CompilerError,
457 confidence: 0.9,
458 source: "test".to_string(),
459 }];
460
461 let priority = evaluator.evaluate(&event, &signals);
462 assert!(priority >= 50); }
464
465 #[test]
466 fn test_rate_limiter() {
467 let limiter = RateLimiter::new(10, 5, 1);
468
469 for _ in 0..5 {
471 assert!(limiter.try_acquire().is_ok());
472 }
473
474 assert!(limiter.try_acquire().is_err());
476 }
477
478 #[test]
479 fn test_auto_prioritizer() {
480 let prioritizer = AutoPrioritizer::default();
481
482 let event = IntakeEvent {
483 event_id: "test-1".to_string(),
484 source_type: IntakeSourceType::Github,
485 source_event_id: Some("run-456".to_string()),
486 title: "Test issue".to_string(),
487 description: "Test".to_string(),
488 severity: IssueSeverity::Medium,
489 signals: vec![],
490 raw_payload: None,
491 timestamp_ms: chrono::Utc::now().timestamp_millis(),
492 };
493
494 let result = prioritizer.process(&event, &[]);
495 assert!(matches!(result, PrioritizationResult::Processed(_)));
496 }
497
498 #[test]
499 fn test_auto_prioritizer_filters_event_via_rule_engine() {
500 let prioritizer =
501 AutoPrioritizer::default().with_rule_engine(RuleEngine::with_rules(vec![IntakeRule {
502 id: "skip_http".to_string(),
503 name: "Skip http events".to_string(),
504 description: "filter low-value http events".to_string(),
505 priority: 100,
506 enabled: true,
507 conditions: RuleConditions {
508 source_types: vec!["http".to_string()],
509 ..Default::default()
510 },
511 actions: vec![RuleAction::Skip],
512 }]));
513
514 let event = IntakeEvent {
515 event_id: "test-filter".to_string(),
516 source_type: IntakeSourceType::Http,
517 source_event_id: Some("webhook-1".to_string()),
518 title: "Noisy webhook".to_string(),
519 description: "ignore this".to_string(),
520 severity: IssueSeverity::Low,
521 signals: vec![],
522 raw_payload: None,
523 timestamp_ms: chrono::Utc::now().timestamp_millis(),
524 };
525
526 let result = prioritizer.process(&event, &[]);
527 match result {
528 PrioritizationResult::Filtered { rule_ids } => {
529 assert_eq!(rule_ids, vec!["skip_http".to_string()]);
530 }
531 other => panic!("expected Filtered result, got {:?}", other),
532 }
533 }
534}