1use std::collections::{HashMap, VecDeque};
4
5use super::events::TeamEvent;
6
7const DEFAULT_WINDOW_SIZE: usize = 20;
8const DEFAULT_NOTIFICATION_THRESHOLD: u32 = 3;
9const DEFAULT_SEVERITY_THRESHOLD: u32 = 5;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum PatternType {
13 RepeatedTestFailure,
14 EscalationCluster,
15 MergeConflictRecurrence,
16}
17
18impl PatternType {
19 pub fn as_str(&self) -> &'static str {
20 match self {
21 PatternType::RepeatedTestFailure => "repeated_test_failure",
22 PatternType::EscalationCluster => "escalation_cluster",
23 PatternType::MergeConflictRecurrence => "merge_conflict_recurrence",
24 }
25 }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct PatternMatch {
30 pub pattern_type: PatternType,
31 pub frequency: u32,
32 pub affected_entities: Vec<String>,
33 pub description: String,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct PatternNotification {
38 pub message: String,
39 pub notify_manager: bool,
40 pub notify_architect: bool,
41 pub pattern_type: PatternType,
42 pub frequency: u32,
43}
44
45pub struct FailureWindow {
46 events: VecDeque<FailureEvent>,
47 window_size: usize,
48}
49
50pub struct FailureTracker {
51 window: FailureWindow,
52 last_notifications: HashMap<String, u32>,
53}
54
55#[derive(Debug, Clone)]
56struct FailureEvent {
57 pub event_type: String,
58 pub role: Option<String>,
59 pub task: Option<String>,
60 pub error: Option<String>,
61 pub ts: u64,
62}
63
64impl FailureWindow {
65 pub fn new(window_size: usize) -> Self {
66 Self {
67 events: VecDeque::new(),
68 window_size: if window_size == 0 {
69 DEFAULT_WINDOW_SIZE
70 } else {
71 window_size
72 },
73 }
74 }
75
76 pub fn push(&mut self, event: &TeamEvent) {
77 if !is_failure_relevant(event) {
78 return;
79 }
80
81 self.events.push_back(FailureEvent {
82 event_type: event.event.clone(),
83 role: event.role.clone(),
84 task: event.task.clone(),
85 error: event.error.clone(),
86 ts: event.ts,
87 });
88
89 while self.events.len() > self.window_size {
90 self.events.pop_front();
91 }
92 }
93
94 pub fn detect_failure_patterns(&self) -> Vec<PatternMatch> {
95 let mut patterns = Vec::new();
96
97 let mut error_counts: HashMap<String, u32> = HashMap::new();
98 for event in &self.events {
99 if event.error.is_some() {
100 if let Some(role) = event.role.as_deref() {
101 *error_counts.entry(role.to_string()).or_insert(0) += 1;
102 }
103 }
104 }
105 for (role, frequency) in error_counts {
106 if frequency >= 2 {
107 patterns.push(PatternMatch {
108 pattern_type: PatternType::RepeatedTestFailure,
109 frequency,
110 affected_entities: vec![role.clone()],
111 description: format!(
112 "{role} hit {frequency} error events in the current failure window"
113 ),
114 });
115 }
116 }
117
118 let escalation_events: Vec<&FailureEvent> = self
119 .events
120 .iter()
121 .filter(|event| event.event_type == "task_escalated")
122 .collect();
123 if escalation_events.len() >= 2 {
124 let mut affected_entities: Vec<String> = escalation_events
125 .iter()
126 .filter_map(|event| event.task.clone())
127 .collect();
128 affected_entities.sort();
129 affected_entities.dedup();
130 patterns.push(PatternMatch {
131 pattern_type: PatternType::EscalationCluster,
132 frequency: escalation_events.len() as u32,
133 affected_entities,
134 description: format!(
135 "{} escalation events detected in the current failure window",
136 escalation_events.len()
137 ),
138 });
139 }
140
141 let conflict_events: Vec<&FailureEvent> = self
142 .events
143 .iter()
144 .filter(|event| {
145 contains_conflict(&event.event_type)
146 || event.error.as_deref().is_some_and(contains_conflict)
147 })
148 .collect();
149 if conflict_events.len() >= 2 {
150 let mut affected_entities: Vec<String> = conflict_events
151 .iter()
152 .filter_map(|event| {
153 event
154 .task
155 .clone()
156 .or_else(|| event.role.clone())
157 .or_else(|| Some(event.ts.to_string()))
158 })
159 .collect();
160 affected_entities.sort();
161 affected_entities.dedup();
162 patterns.push(PatternMatch {
163 pattern_type: PatternType::MergeConflictRecurrence,
164 frequency: conflict_events.len() as u32,
165 affected_entities,
166 description: format!(
167 "{} conflict-related events detected in the current failure window",
168 conflict_events.len()
169 ),
170 });
171 }
172
173 patterns.sort_by(|left, right| {
174 right
175 .frequency
176 .cmp(&left.frequency)
177 .then_with(|| {
178 pattern_sort_key(&left.pattern_type).cmp(&pattern_sort_key(&right.pattern_type))
179 })
180 .then_with(|| left.description.cmp(&right.description))
181 });
182 patterns
183 }
184}
185
186impl FailureTracker {
187 pub fn new(window_size: usize) -> Self {
188 Self {
189 window: FailureWindow::new(window_size),
190 last_notifications: HashMap::new(),
191 }
192 }
193
194 pub fn push(&mut self, event: &TeamEvent) {
195 self.window.push(event);
196 }
197
198 pub fn pattern_notifications(
199 &mut self,
200 notification_threshold: u32,
201 severity_threshold: u32,
202 ) -> Vec<PatternNotification> {
203 let (notification_threshold, severity_threshold) =
204 normalized_thresholds(notification_threshold, severity_threshold);
205
206 self.window
207 .detect_failure_patterns()
208 .into_iter()
209 .filter(|pattern| pattern.frequency >= notification_threshold)
210 .filter_map(|pattern| {
211 let signature = pattern_signature(&pattern);
212 let last_frequency = self
213 .last_notifications
214 .get(&signature)
215 .copied()
216 .unwrap_or(0);
217 if pattern.frequency <= last_frequency {
218 return None;
219 }
220 self.last_notifications.insert(signature, pattern.frequency);
221 Some(build_pattern_notification(&pattern, severity_threshold))
222 })
223 .collect()
224 }
225}
226
227fn pattern_sort_key(pattern_type: &PatternType) -> u8 {
228 match pattern_type {
229 PatternType::RepeatedTestFailure => 0,
230 PatternType::EscalationCluster => 1,
231 PatternType::MergeConflictRecurrence => 2,
232 }
233}
234
235fn is_failure_relevant(event: &TeamEvent) -> bool {
236 event.event == "task_escalated"
237 || event.error.is_some()
238 || contains_failure_keyword(&event.event)
239}
240
241fn contains_failure_keyword(value: &str) -> bool {
242 let value = value.to_ascii_lowercase();
243 value.contains("fail") || value.contains("conflict")
244}
245
246fn contains_conflict(value: &str) -> bool {
247 value.to_ascii_lowercase().contains("conflict")
248}
249
250#[cfg_attr(not(test), allow(dead_code))]
251pub fn generate_pattern_notifications(
252 patterns: &[PatternMatch],
253 notification_threshold: u32,
254 severity_threshold: u32,
255) -> Vec<PatternNotification> {
256 let (notification_threshold, severity_threshold) =
257 normalized_thresholds(notification_threshold, severity_threshold);
258
259 patterns
260 .iter()
261 .filter(|pattern| pattern.frequency >= notification_threshold)
262 .map(|pattern| build_pattern_notification(pattern, severity_threshold))
263 .collect()
264}
265
266fn normalized_thresholds(notification_threshold: u32, severity_threshold: u32) -> (u32, u32) {
267 let notification_threshold = if notification_threshold == 0 {
268 DEFAULT_NOTIFICATION_THRESHOLD
269 } else {
270 notification_threshold
271 };
272 let severity_threshold = if severity_threshold == 0 {
273 DEFAULT_SEVERITY_THRESHOLD
274 } else {
275 severity_threshold
276 };
277 (notification_threshold, severity_threshold)
278}
279
280fn build_pattern_notification(
281 pattern: &PatternMatch,
282 severity_threshold: u32,
283) -> PatternNotification {
284 PatternNotification {
285 message: pattern_notification_message(pattern),
286 notify_manager: true,
287 notify_architect: pattern.frequency >= severity_threshold,
288 pattern_type: pattern.pattern_type.clone(),
289 frequency: pattern.frequency,
290 }
291}
292
293fn pattern_signature(pattern: &PatternMatch) -> String {
294 format!(
295 "{}:{}",
296 pattern.pattern_type.as_str(),
297 pattern.affected_entities.join(",")
298 )
299}
300
301fn pattern_notification_message(pattern: &PatternMatch) -> String {
302 let affected = format_affected_entities(&pattern.affected_entities);
303 match pattern.pattern_type {
304 PatternType::RepeatedTestFailure => format!(
305 "Repeated test failures for {affected} ({}) in the recent window. Review the failing runs and stabilize before retrying.",
306 pattern.frequency
307 ),
308 PatternType::EscalationCluster => format!(
309 "Escalations clustered across {affected} ({} total). Review blockers and rebalance or unblock the work.",
310 pattern.frequency
311 ),
312 PatternType::MergeConflictRecurrence => format!(
313 "Merge conflicts keep recurring across {affected} ({} total). Pause merges, rebase branches, and fix the shared hotspot.",
314 pattern.frequency
315 ),
316 }
317}
318
319fn format_affected_entities(entities: &[String]) -> String {
320 if entities.is_empty() {
321 "recent work".to_string()
322 } else {
323 entities.join(", ")
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 fn error_event(event: &str, role: &str, error: &str, ts: u64) -> TeamEvent {
332 TeamEvent {
333 event: event.to_string(),
334 action_type: None,
335 version: None,
336 git_ref: None,
337 role: Some(role.to_string()),
338 task: None,
339 recipient: None,
340 from: None,
341 to: None,
342 restart: None,
343 restart_count: None,
344 load: None,
345 working_members: None,
346 total_members: None,
347 session_running: None,
348 reason: None,
349 step: None,
350 error: Some(error.to_string()),
351 details: None,
352 uptime_secs: None,
353 session_size_bytes: None,
354 output_bytes: None,
355 filename: None,
356 content_hash: None,
357 backend: None,
358 success: None,
359 merge_mode: None,
360 narration_ratio: None,
361 commit_frequency: None,
362 first_pass_test_rate: None,
363 retry_rate: None,
364 time_to_completion_secs: None,
365 ts,
366 }
367 }
368
369 fn conflict_event(
370 event: &str,
371 role: &str,
372 task: &str,
373 error: Option<&str>,
374 ts: u64,
375 ) -> TeamEvent {
376 TeamEvent {
377 event: event.to_string(),
378 action_type: None,
379 version: None,
380 git_ref: None,
381 role: Some(role.to_string()),
382 task: Some(task.to_string()),
383 recipient: None,
384 from: None,
385 to: None,
386 restart: None,
387 restart_count: None,
388 load: None,
389 working_members: None,
390 total_members: None,
391 session_running: None,
392 reason: None,
393 step: None,
394 error: error.map(str::to_string),
395 details: None,
396 uptime_secs: None,
397 session_size_bytes: None,
398 output_bytes: None,
399 filename: None,
400 content_hash: None,
401 backend: None,
402 success: None,
403 merge_mode: None,
404 narration_ratio: None,
405 commit_frequency: None,
406 first_pass_test_rate: None,
407 retry_rate: None,
408 time_to_completion_secs: None,
409 ts,
410 }
411 }
412
413 #[test]
414 fn test_detect_repeated_test_failures() {
415 let mut window = FailureWindow::new(20);
416 window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
417 window.push(&error_event("test_failure", "eng-1", "tests failed", 2));
418 window.push(&error_event("test_failure", "eng-1", "tests failed", 3));
419
420 let patterns = window.detect_failure_patterns();
421
422 assert_eq!(patterns.len(), 1);
423 assert_eq!(patterns[0].pattern_type, PatternType::RepeatedTestFailure);
424 assert_eq!(patterns[0].frequency, 3);
425 assert_eq!(patterns[0].affected_entities, vec!["eng-1".to_string()]);
426 }
427
428 #[test]
429 fn test_detect_escalation_cluster() {
430 let mut window = FailureWindow::new(20);
431 let mut event1 = TeamEvent::task_escalated("eng-1", "101", None);
432 event1.ts = 1;
433 let mut event2 = TeamEvent::task_escalated("eng-2", "102", None);
434 event2.ts = 2;
435 window.push(&event1);
436 window.push(&event2);
437
438 let patterns = window.detect_failure_patterns();
439
440 assert_eq!(patterns.len(), 1);
441 assert_eq!(patterns[0].pattern_type, PatternType::EscalationCluster);
442 assert_eq!(patterns[0].frequency, 2);
443 assert_eq!(
444 patterns[0].affected_entities,
445 vec!["101".to_string(), "102".to_string()]
446 );
447 }
448
449 #[test]
450 fn test_detect_merge_conflict_recurrence() {
451 let mut window = FailureWindow::new(20);
452 window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 1));
453 window.push(&conflict_event(
454 "loop_step_error",
455 "eng-1",
456 "202",
457 Some("rebase conflict on main"),
458 2,
459 ));
460
461 let patterns = window.detect_failure_patterns();
462
463 assert_eq!(patterns.len(), 1);
464 assert_eq!(
465 patterns[0].pattern_type,
466 PatternType::MergeConflictRecurrence
467 );
468 assert_eq!(patterns[0].frequency, 2);
469 assert_eq!(
470 patterns[0].affected_entities,
471 vec!["201".to_string(), "202".to_string()]
472 );
473 }
474
475 #[test]
476 fn test_window_rollover() {
477 let mut window = FailureWindow::new(5);
478 for index in 0..10 {
479 window.push(&conflict_event(
480 "merge_conflict",
481 "eng-1",
482 &format!("task-{index}"),
483 None,
484 index,
485 ));
486 }
487
488 assert_eq!(window.events.len(), 5);
489 assert_eq!(
490 window
491 .events
492 .front()
493 .and_then(|event| event.task.as_deref()),
494 Some("task-5")
495 );
496 assert_eq!(
497 window.events.back().and_then(|event| event.task.as_deref()),
498 Some("task-9")
499 );
500 }
501
502 #[test]
503 fn test_no_patterns_when_below_threshold() {
504 let mut window = FailureWindow::new(20);
505 window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
506 let mut escalation = TeamEvent::task_escalated("eng-1", "101", None);
507 escalation.ts = 2;
508 window.push(&escalation);
509 window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 3));
510
511 assert!(window.detect_failure_patterns().is_empty());
512 }
513
514 #[test]
515 fn test_non_failure_events_ignored() {
516 let mut window = FailureWindow::new(20);
517 let mut routed = TeamEvent::message_routed("manager", "eng-1");
518 routed.ts = 2;
519 window.push(&TeamEvent::daemon_started());
520 window.push(&routed);
521
522 assert!(window.events.is_empty());
523 assert!(window.detect_failure_patterns().is_empty());
524 }
525
526 #[test]
527 fn notification_threshold_triggers() {
528 let notifications = generate_pattern_notifications(
529 &[PatternMatch {
530 pattern_type: PatternType::RepeatedTestFailure,
531 frequency: 3,
532 affected_entities: vec!["eng-1".to_string()],
533 description: "eng-1 failing repeatedly".to_string(),
534 }],
535 3,
536 5,
537 );
538
539 assert_eq!(notifications.len(), 1);
540 assert!(notifications[0].notify_manager);
541 assert_eq!(notifications[0].frequency, 3);
542 }
543
544 #[test]
545 fn notification_below_threshold_is_silent() {
546 let notifications = generate_pattern_notifications(
547 &[PatternMatch {
548 pattern_type: PatternType::RepeatedTestFailure,
549 frequency: 2,
550 affected_entities: vec!["eng-1".to_string()],
551 description: "eng-1 failing repeatedly".to_string(),
552 }],
553 3,
554 5,
555 );
556
557 assert!(notifications.is_empty());
558 }
559
560 #[test]
561 fn severity_routes_to_architect() {
562 let notifications = generate_pattern_notifications(
563 &[PatternMatch {
564 pattern_type: PatternType::MergeConflictRecurrence,
565 frequency: 5,
566 affected_entities: vec!["201".to_string(), "202".to_string()],
567 description: "conflicts recurring".to_string(),
568 }],
569 3,
570 5,
571 );
572
573 assert_eq!(notifications.len(), 1);
574 assert!(notifications[0].notify_architect);
575 }
576
577 #[test]
578 fn below_severity_routes_to_manager_only() {
579 let notifications = generate_pattern_notifications(
580 &[PatternMatch {
581 pattern_type: PatternType::EscalationCluster,
582 frequency: 4,
583 affected_entities: vec!["101".to_string(), "102".to_string()],
584 description: "escalations piling up".to_string(),
585 }],
586 3,
587 5,
588 );
589
590 assert_eq!(notifications.len(), 1);
591 assert!(notifications[0].notify_manager);
592 assert!(!notifications[0].notify_architect);
593 }
594
595 #[test]
596 fn notification_message_is_actionable() {
597 let notifications = generate_pattern_notifications(
598 &[PatternMatch {
599 pattern_type: PatternType::MergeConflictRecurrence,
600 frequency: 4,
601 affected_entities: vec!["201".to_string()],
602 description: "conflicts recurring".to_string(),
603 }],
604 3,
605 5,
606 );
607
608 let message = notifications[0].message.to_ascii_lowercase();
609 assert!(message.contains("pause"));
610 assert!(message.contains("rebase"));
611 assert!(message.contains("fix"));
612 }
613
614 #[test]
615 fn tracker_suppresses_repeated_notifications_without_new_failures() {
616 let mut tracker = FailureTracker::new(20);
617 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
618 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
619 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
620
621 let initial = tracker.pattern_notifications(3, 5);
622 let repeated = tracker.pattern_notifications(3, 5);
623
624 assert_eq!(initial.len(), 1);
625 assert!(repeated.is_empty());
626 }
627
628 #[test]
629 fn tracker_reemits_when_pattern_frequency_increases() {
630 let mut tracker = FailureTracker::new(20);
631 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
632 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
633 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
634 assert_eq!(tracker.pattern_notifications(3, 5).len(), 1);
635
636 tracker.push(&error_event("test_failure", "eng-1", "tests failed", 4));
637
638 let notifications = tracker.pattern_notifications(3, 5);
639
640 assert_eq!(notifications.len(), 1);
641 assert_eq!(notifications[0].frequency, 4);
642 }
643}