1use std::collections::{HashMap, VecDeque};
4
5use super::events::TeamEvent;
6
7const DEFAULT_WINDOW_SIZE: usize = 20;
8const DEFAULT_NOTIFICATION_THRESHOLD: u32 = 3;
9const DEFAULT_SEVERITY_THRESHOLD: u32 = 5;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum PatternType {
13 RepeatedTestFailure,
14 EscalationCluster,
15 MergeConflictRecurrence,
16}
17
18impl PatternType {
19 pub fn as_str(&self) -> &'static str {
20 match self {
21 PatternType::RepeatedTestFailure => "repeated_test_failure",
22 PatternType::EscalationCluster => "escalation_cluster",
23 PatternType::MergeConflictRecurrence => "merge_conflict_recurrence",
24 }
25 }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct PatternMatch {
30 pub pattern_type: PatternType,
31 pub frequency: u32,
32 pub affected_entities: Vec<String>,
33 pub description: String,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct PatternNotification {
38 pub message: String,
39 pub notify_manager: bool,
40 pub notify_architect: bool,
41 pub pattern_type: PatternType,
42 pub frequency: u32,
43}
44
45pub struct FailureWindow {
46 events: VecDeque<FailureEvent>,
47 window_size: usize,
48}
49
50pub struct FailureTracker {
51 window: FailureWindow,
52 last_notifications: HashMap<String, u32>,
53}
54
55#[derive(Debug, Clone)]
56struct FailureEvent {
57 pub event_type: String,
58 pub role: Option<String>,
59 pub task: Option<String>,
60 pub error: Option<String>,
61 pub ts: u64,
62}
63
64impl FailureWindow {
65 pub fn new(window_size: usize) -> Self {
66 Self {
67 events: VecDeque::new(),
68 window_size: if window_size == 0 {
69 DEFAULT_WINDOW_SIZE
70 } else {
71 window_size
72 },
73 }
74 }
75
76 pub fn push(&mut self, event: &TeamEvent) {
77 if !is_failure_relevant(event) {
78 return;
79 }
80
81 self.events.push_back(FailureEvent {
82 event_type: event.event.clone(),
83 role: event.role.clone(),
84 task: event.task.clone(),
85 error: event.error.clone(),
86 ts: event.ts,
87 });
88
89 while self.events.len() > self.window_size {
90 self.events.pop_front();
91 }
92 }
93
94 pub fn detect_failure_patterns(&self) -> Vec<PatternMatch> {
95 let mut patterns = Vec::new();
96
97 let mut error_counts: HashMap<String, u32> = HashMap::new();
98 for event in &self.events {
99 if event.error.is_some() {
100 if let Some(role) = event.role.as_deref() {
101 *error_counts.entry(role.to_string()).or_insert(0) += 1;
102 }
103 }
104 }
105 for (role, frequency) in error_counts {
106 if frequency >= 2 {
107 patterns.push(PatternMatch {
108 pattern_type: PatternType::RepeatedTestFailure,
109 frequency,
110 affected_entities: vec![role.clone()],
111 description: format!(
112 "{role} hit {frequency} error events in the current failure window"
113 ),
114 });
115 }
116 }
117
118 let escalation_events: Vec<&FailureEvent> = self
119 .events
120 .iter()
121 .filter(|event| event.event_type == "task_escalated")
122 .collect();
123 if escalation_events.len() >= 2 {
124 let mut affected_entities: Vec<String> = escalation_events
125 .iter()
126 .filter_map(|event| event.task.clone())
127 .collect();
128 affected_entities.sort();
129 affected_entities.dedup();
130 patterns.push(PatternMatch {
131 pattern_type: PatternType::EscalationCluster,
132 frequency: escalation_events.len() as u32,
133 affected_entities,
134 description: format!(
135 "{} escalation events detected in the current failure window",
136 escalation_events.len()
137 ),
138 });
139 }
140
141 let conflict_events: Vec<&FailureEvent> = self
142 .events
143 .iter()
144 .filter(|event| {
145 contains_conflict(&event.event_type)
146 || event.error.as_deref().is_some_and(contains_conflict)
147 })
148 .collect();
149 if conflict_events.len() >= 2 {
150 let mut affected_entities: Vec<String> = conflict_events
151 .iter()
152 .filter_map(|event| {
153 event
154 .task
155 .clone()
156 .or_else(|| event.role.clone())
157 .or_else(|| Some(event.ts.to_string()))
158 })
159 .collect();
160 affected_entities.sort();
161 affected_entities.dedup();
162 patterns.push(PatternMatch {
163 pattern_type: PatternType::MergeConflictRecurrence,
164 frequency: conflict_events.len() as u32,
165 affected_entities,
166 description: format!(
167 "{} conflict-related events detected in the current failure window",
168 conflict_events.len()
169 ),
170 });
171 }
172
173 patterns.sort_by(|left, right| {
174 right
175 .frequency
176 .cmp(&left.frequency)
177 .then_with(|| {
178 pattern_sort_key(&left.pattern_type).cmp(&pattern_sort_key(&right.pattern_type))
179 })
180 .then_with(|| left.description.cmp(&right.description))
181 });
182 patterns
183 }
184}
185
186impl FailureTracker {
187 pub fn new(window_size: usize) -> Self {
188 Self {
189 window: FailureWindow::new(window_size),
190 last_notifications: HashMap::new(),
191 }
192 }
193
194 pub fn push(&mut self, event: &TeamEvent) {
195 self.window.push(event);
196 }
197
198 pub fn pattern_notifications(
199 &mut self,
200 notification_threshold: u32,
201 severity_threshold: u32,
202 ) -> Vec<PatternNotification> {
203 let (notification_threshold, severity_threshold) =
204 normalized_thresholds(notification_threshold, severity_threshold);
205
206 self.window
207 .detect_failure_patterns()
208 .into_iter()
209 .filter(|pattern| pattern.frequency >= notification_threshold)
210 .filter_map(|pattern| {
211 let signature = pattern_signature(&pattern);
212 let last_frequency = self
213 .last_notifications
214 .get(&signature)
215 .copied()
216 .unwrap_or(0);
217 if pattern.frequency <= last_frequency {
218 return None;
219 }
220 self.last_notifications.insert(signature, pattern.frequency);
221 Some(build_pattern_notification(&pattern, severity_threshold))
222 })
223 .collect()
224 }
225}
226
227fn pattern_sort_key(pattern_type: &PatternType) -> u8 {
228 match pattern_type {
229 PatternType::RepeatedTestFailure => 0,
230 PatternType::EscalationCluster => 1,
231 PatternType::MergeConflictRecurrence => 2,
232 }
233}
234
235fn is_failure_relevant(event: &TeamEvent) -> bool {
236 event.event == "task_escalated"
237 || event.error.is_some()
238 || contains_failure_keyword(&event.event)
239}
240
241fn contains_failure_keyword(value: &str) -> bool {
242 let value = value.to_ascii_lowercase();
243 value.contains("fail") || value.contains("conflict")
244}
245
246fn contains_conflict(value: &str) -> bool {
247 value.to_ascii_lowercase().contains("conflict")
248}
249
250#[cfg_attr(not(test), allow(dead_code))]
251pub fn generate_pattern_notifications(
252 patterns: &[PatternMatch],
253 notification_threshold: u32,
254 severity_threshold: u32,
255) -> Vec<PatternNotification> {
256 let (notification_threshold, severity_threshold) =
257 normalized_thresholds(notification_threshold, severity_threshold);
258
259 patterns
260 .iter()
261 .filter(|pattern| pattern.frequency >= notification_threshold)
262 .map(|pattern| build_pattern_notification(pattern, severity_threshold))
263 .collect()
264}
265
266fn normalized_thresholds(notification_threshold: u32, severity_threshold: u32) -> (u32, u32) {
267 let notification_threshold = if notification_threshold == 0 {
268 DEFAULT_NOTIFICATION_THRESHOLD
269 } else {
270 notification_threshold
271 };
272 let severity_threshold = if severity_threshold == 0 {
273 DEFAULT_SEVERITY_THRESHOLD
274 } else {
275 severity_threshold
276 };
277 (notification_threshold, severity_threshold)
278}
279
280fn build_pattern_notification(
281 pattern: &PatternMatch,
282 severity_threshold: u32,
283) -> PatternNotification {
284 PatternNotification {
285 message: pattern_notification_message(pattern),
286 notify_manager: true,
287 notify_architect: pattern.frequency >= severity_threshold,
288 pattern_type: pattern.pattern_type.clone(),
289 frequency: pattern.frequency,
290 }
291}
292
293fn pattern_signature(pattern: &PatternMatch) -> String {
294 format!(
295 "{}:{}",
296 pattern.pattern_type.as_str(),
297 pattern.affected_entities.join(",")
298 )
299}
300
301fn pattern_notification_message(pattern: &PatternMatch) -> String {
302 let affected = format_affected_entities(&pattern.affected_entities);
303 match pattern.pattern_type {
304 PatternType::RepeatedTestFailure => format!(
305 "Repeated test failures for {affected} ({}) in the recent window. Review the failing runs and stabilize before retrying.",
306 pattern.frequency
307 ),
308 PatternType::EscalationCluster => format!(
309 "Escalations clustered across {affected} ({} total). Review blockers and rebalance or unblock the work.",
310 pattern.frequency
311 ),
312 PatternType::MergeConflictRecurrence => format!(
313 "Merge conflicts keep recurring across {affected} ({} total). Pause merges, rebase branches, and fix the shared hotspot.",
314 pattern.frequency
315 ),
316 }
317}
318
319fn format_affected_entities(entities: &[String]) -> String {
320 if entities.is_empty() {
321 "recent work".to_string()
322 } else {
323 entities.join(", ")
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 fn error_event(event: &str, role: &str, error: &str, ts: u64) -> TeamEvent {
332 TeamEvent {
333 event: event.to_string(),
334 role: Some(role.to_string()),
335 task: None,
336 recipient: None,
337 from: None,
338 to: None,
339 restart: None,
340 restart_count: None,
341 load: None,
342 working_members: None,
343 total_members: None,
344 session_running: None,
345 reason: None,
346 step: None,
347 error: Some(error.to_string()),
348 uptime_secs: None,
349 session_size_bytes: None,
350 output_bytes: None,
351 filename: None,
352 content_hash: None,
353 ts,
354 }
355 }
356
357 fn conflict_event(
358 event: &str,
359 role: &str,
360 task: &str,
361 error: Option<&str>,
362 ts: u64,
363 ) -> TeamEvent {
364 TeamEvent {
365 event: event.to_string(),
366 role: Some(role.to_string()),
367 task: Some(task.to_string()),
368 recipient: None,
369 from: None,
370 to: None,
371 restart: None,
372 restart_count: None,
373 load: None,
374 working_members: None,
375 total_members: None,
376 session_running: None,
377 reason: None,
378 step: None,
379 error: error.map(str::to_string),
380 uptime_secs: None,
381 session_size_bytes: None,
382 output_bytes: None,
383 filename: None,
384 content_hash: None,
385 ts,
386 }
387 }
388
389 #[test]
390 fn test_detect_repeated_test_failures() {
391 let mut window = FailureWindow::new(20);
392 window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
393 window.push(&error_event("test_failure", "eng-1", "tests failed", 2));
394 window.push(&error_event("test_failure", "eng-1", "tests failed", 3));
395
396 let patterns = window.detect_failure_patterns();
397
398 assert_eq!(patterns.len(), 1);
399 assert_eq!(patterns[0].pattern_type, PatternType::RepeatedTestFailure);
400 assert_eq!(patterns[0].frequency, 3);
401 assert_eq!(patterns[0].affected_entities, vec!["eng-1".to_string()]);
402 }
403
404 #[test]
405 fn test_detect_escalation_cluster() {
406 let mut window = FailureWindow::new(20);
407 let mut event1 = TeamEvent::task_escalated("eng-1", "101", None);
408 event1.ts = 1;
409 let mut event2 = TeamEvent::task_escalated("eng-2", "102", None);
410 event2.ts = 2;
411 window.push(&event1);
412 window.push(&event2);
413
414 let patterns = window.detect_failure_patterns();
415
416 assert_eq!(patterns.len(), 1);
417 assert_eq!(patterns[0].pattern_type, PatternType::EscalationCluster);
418 assert_eq!(patterns[0].frequency, 2);
419 assert_eq!(
420 patterns[0].affected_entities,
421 vec!["101".to_string(), "102".to_string()]
422 );
423 }
424
425 #[test]
426 fn test_detect_merge_conflict_recurrence() {
427 let mut window = FailureWindow::new(20);
428 window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 1));
429 window.push(&conflict_event(
430 "loop_step_error",
431 "eng-1",
432 "202",
433 Some("rebase conflict on main"),
434 2,
435 ));
436
437 let patterns = window.detect_failure_patterns();
438
439 assert_eq!(patterns.len(), 1);
440 assert_eq!(
441 patterns[0].pattern_type,
442 PatternType::MergeConflictRecurrence
443 );
444 assert_eq!(patterns[0].frequency, 2);
445 assert_eq!(
446 patterns[0].affected_entities,
447 vec!["201".to_string(), "202".to_string()]
448 );
449 }
450
451 #[test]
452 fn test_window_rollover() {
453 let mut window = FailureWindow::new(5);
454 for index in 0..10 {
455 window.push(&conflict_event(
456 "merge_conflict",
457 "eng-1",
458 &format!("task-{index}"),
459 None,
460 index,
461 ));
462 }
463
464 assert_eq!(window.events.len(), 5);
465 assert_eq!(
466 window
467 .events
468 .front()
469 .and_then(|event| event.task.as_deref()),
470 Some("task-5")
471 );
472 assert_eq!(
473 window.events.back().and_then(|event| event.task.as_deref()),
474 Some("task-9")
475 );
476 }
477
478 #[test]
479 fn test_no_patterns_when_below_threshold() {
480 let mut window = FailureWindow::new(20);
481 window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
482 let mut escalation = TeamEvent::task_escalated("eng-1", "101", None);
483 escalation.ts = 2;
484 window.push(&escalation);
485 window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 3));
486
487 assert!(window.detect_failure_patterns().is_empty());
488 }
489
490 #[test]
491 fn test_non_failure_events_ignored() {
492 let mut window = FailureWindow::new(20);
493 let mut routed = TeamEvent::message_routed("manager", "eng-1");
494 routed.ts = 2;
495 window.push(&TeamEvent::daemon_started());
496 window.push(&routed);
497
498 assert!(window.events.is_empty());
499 assert!(window.detect_failure_patterns().is_empty());
500 }
501
502 #[test]
503 fn notification_threshold_triggers() {
504 let notifications = generate_pattern_notifications(
505 &[PatternMatch {
506 pattern_type: PatternType::RepeatedTestFailure,
507 frequency: 3,
508 affected_entities: vec!["eng-1".to_string()],
509 description: "eng-1 failing repeatedly".to_string(),
510 }],
511 3,
512 5,
513 );
514
515 assert_eq!(notifications.len(), 1);
516 assert!(notifications[0].notify_manager);
517 assert_eq!(notifications[0].frequency, 3);
518 }
519
520 #[test]
521 fn notification_below_threshold_is_silent() {
522 let notifications = generate_pattern_notifications(
523 &[PatternMatch {
524 pattern_type: PatternType::RepeatedTestFailure,
525 frequency: 2,
526 affected_entities: vec!["eng-1".to_string()],
527 description: "eng-1 failing repeatedly".to_string(),
528 }],
529 3,
530 5,
531 );
532
533 assert!(notifications.is_empty());
534 }
535
536 #[test]
537 fn severity_routes_to_architect() {
538 let notifications = generate_pattern_notifications(
539 &[PatternMatch {
540 pattern_type: PatternType::MergeConflictRecurrence,
541 frequency: 5,
542 affected_entities: vec!["201".to_string(), "202".to_string()],
543 description: "conflicts recurring".to_string(),
544 }],
545 3,
546 5,
547 );
548
549 assert_eq!(notifications.len(), 1);
550 assert!(notifications[0].notify_architect);
551 }
552
553 #[test]
554 fn below_severity_routes_to_manager_only() {
555 let notifications = generate_pattern_notifications(
556 &[PatternMatch {
557 pattern_type: PatternType::EscalationCluster,
558 frequency: 4,
559 affected_entities: vec!["101".to_string(), "102".to_string()],
560 description: "escalations piling up".to_string(),
561 }],
562 3,
563 5,
564 );
565
566 assert_eq!(notifications.len(), 1);
567 assert!(notifications[0].notify_manager);
568 assert!(!notifications[0].notify_architect);
569 }
570
571 #[test]
572 fn notification_message_is_actionable() {
573 let notifications = generate_pattern_notifications(
574 &[PatternMatch {
575 pattern_type: PatternType::MergeConflictRecurrence,
576 frequency: 4,
577 affected_entities: vec!["201".to_string()],
578 description: "conflicts recurring".to_string(),
579 }],
580 3,
581 5,
582 );
583
584 let message = notifications[0].message.to_ascii_lowercase();
585 assert!(message.contains("pause"));
586 assert!(message.contains("rebase"));
587 assert!(message.contains("fix"));
588 }
589
590 #[test]
591 fn tracker_suppresses_repeated_notifications_without_new_failures() {
592 let mut tracker = FailureTracker::new(20);
593 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
594 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
595 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
596
597 let initial = tracker.pattern_notifications(3, 5);
598 let repeated = tracker.pattern_notifications(3, 5);
599
600 assert_eq!(initial.len(), 1);
601 assert!(repeated.is_empty());
602 }
603
604 #[test]
605 fn tracker_reemits_when_pattern_frequency_increases() {
606 let mut tracker = FailureTracker::new(20);
607 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
608 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
609 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
610 assert_eq!(tracker.pattern_notifications(3, 5).len(), 1);
611
612 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 4));
613
614 let notifications = tracker.pattern_notifications(3, 5);
615
616 assert_eq!(notifications.len(), 1);
617 assert_eq!(notifications[0].frequency, 4);
618 }
619}