Skip to main content

batty_cli/team/
failure_patterns.rs

1//! Rolling failure-signature detection over recent team events.
2
3use std::collections::{HashMap, VecDeque};
4
5use super::events::TeamEvent;
6
7const DEFAULT_WINDOW_SIZE: usize = 20;
8const DEFAULT_NOTIFICATION_THRESHOLD: u32 = 3;
9const DEFAULT_SEVERITY_THRESHOLD: u32 = 5;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum PatternType {
13    RepeatedTestFailure,
14    EscalationCluster,
15    MergeConflictRecurrence,
16}
17
18impl PatternType {
19    pub fn as_str(&self) -> &'static str {
20        match self {
21            PatternType::RepeatedTestFailure => "repeated_test_failure",
22            PatternType::EscalationCluster => "escalation_cluster",
23            PatternType::MergeConflictRecurrence => "merge_conflict_recurrence",
24        }
25    }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct PatternMatch {
30    pub pattern_type: PatternType,
31    pub frequency: u32,
32    pub affected_entities: Vec<String>,
33    pub description: String,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct PatternNotification {
38    pub message: String,
39    pub notify_manager: bool,
40    pub notify_architect: bool,
41    pub pattern_type: PatternType,
42    pub frequency: u32,
43}
44
45pub struct FailureWindow {
46    events: VecDeque<FailureEvent>,
47    window_size: usize,
48}
49
50pub struct FailureTracker {
51    window: FailureWindow,
52    last_notifications: HashMap<String, u32>,
53}
54
55#[derive(Debug, Clone)]
56struct FailureEvent {
57    pub event_type: String,
58    pub role: Option<String>,
59    pub task: Option<String>,
60    pub error: Option<String>,
61    pub ts: u64,
62}
63
64impl FailureWindow {
65    pub fn new(window_size: usize) -> Self {
66        Self {
67            events: VecDeque::new(),
68            window_size: if window_size == 0 {
69                DEFAULT_WINDOW_SIZE
70            } else {
71                window_size
72            },
73        }
74    }
75
76    pub fn push(&mut self, event: &TeamEvent) {
77        if !is_failure_relevant(event) {
78            return;
79        }
80
81        self.events.push_back(FailureEvent {
82            event_type: event.event.clone(),
83            role: event.role.clone(),
84            task: event.task.clone(),
85            error: event.error.clone(),
86            ts: event.ts,
87        });
88
89        while self.events.len() > self.window_size {
90            self.events.pop_front();
91        }
92    }
93
94    pub fn detect_failure_patterns(&self) -> Vec<PatternMatch> {
95        let mut patterns = Vec::new();
96
97        let mut error_counts: HashMap<String, u32> = HashMap::new();
98        for event in &self.events {
99            if event.error.is_some() {
100                if let Some(role) = event.role.as_deref() {
101                    *error_counts.entry(role.to_string()).or_insert(0) += 1;
102                }
103            }
104        }
105        for (role, frequency) in error_counts {
106            if frequency >= 2 {
107                patterns.push(PatternMatch {
108                    pattern_type: PatternType::RepeatedTestFailure,
109                    frequency,
110                    affected_entities: vec![role.clone()],
111                    description: format!(
112                        "{role} hit {frequency} error events in the current failure window"
113                    ),
114                });
115            }
116        }
117
118        let escalation_events: Vec<&FailureEvent> = self
119            .events
120            .iter()
121            .filter(|event| event.event_type == "task_escalated")
122            .collect();
123        if escalation_events.len() >= 2 {
124            let mut affected_entities: Vec<String> = escalation_events
125                .iter()
126                .filter_map(|event| event.task.clone())
127                .collect();
128            affected_entities.sort();
129            affected_entities.dedup();
130            patterns.push(PatternMatch {
131                pattern_type: PatternType::EscalationCluster,
132                frequency: escalation_events.len() as u32,
133                affected_entities,
134                description: format!(
135                    "{} escalation events detected in the current failure window",
136                    escalation_events.len()
137                ),
138            });
139        }
140
141        let conflict_events: Vec<&FailureEvent> = self
142            .events
143            .iter()
144            .filter(|event| {
145                contains_conflict(&event.event_type)
146                    || event.error.as_deref().is_some_and(contains_conflict)
147            })
148            .collect();
149        if conflict_events.len() >= 2 {
150            let mut affected_entities: Vec<String> = conflict_events
151                .iter()
152                .filter_map(|event| {
153                    event
154                        .task
155                        .clone()
156                        .or_else(|| event.role.clone())
157                        .or_else(|| Some(event.ts.to_string()))
158                })
159                .collect();
160            affected_entities.sort();
161            affected_entities.dedup();
162            patterns.push(PatternMatch {
163                pattern_type: PatternType::MergeConflictRecurrence,
164                frequency: conflict_events.len() as u32,
165                affected_entities,
166                description: format!(
167                    "{} conflict-related events detected in the current failure window",
168                    conflict_events.len()
169                ),
170            });
171        }
172
173        patterns.sort_by(|left, right| {
174            right
175                .frequency
176                .cmp(&left.frequency)
177                .then_with(|| {
178                    pattern_sort_key(&left.pattern_type).cmp(&pattern_sort_key(&right.pattern_type))
179                })
180                .then_with(|| left.description.cmp(&right.description))
181        });
182        patterns
183    }
184}
185
186impl FailureTracker {
187    pub fn new(window_size: usize) -> Self {
188        Self {
189            window: FailureWindow::new(window_size),
190            last_notifications: HashMap::new(),
191        }
192    }
193
194    pub fn push(&mut self, event: &TeamEvent) {
195        self.window.push(event);
196    }
197
198    pub fn pattern_notifications(
199        &mut self,
200        notification_threshold: u32,
201        severity_threshold: u32,
202    ) -> Vec<PatternNotification> {
203        let (notification_threshold, severity_threshold) =
204            normalized_thresholds(notification_threshold, severity_threshold);
205
206        self.window
207            .detect_failure_patterns()
208            .into_iter()
209            .filter(|pattern| pattern.frequency >= notification_threshold)
210            .filter_map(|pattern| {
211                let signature = pattern_signature(&pattern);
212                let last_frequency = self
213                    .last_notifications
214                    .get(&signature)
215                    .copied()
216                    .unwrap_or(0);
217                if pattern.frequency <= last_frequency {
218                    return None;
219                }
220                self.last_notifications.insert(signature, pattern.frequency);
221                Some(build_pattern_notification(&pattern, severity_threshold))
222            })
223            .collect()
224    }
225}
226
227fn pattern_sort_key(pattern_type: &PatternType) -> u8 {
228    match pattern_type {
229        PatternType::RepeatedTestFailure => 0,
230        PatternType::EscalationCluster => 1,
231        PatternType::MergeConflictRecurrence => 2,
232    }
233}
234
235fn is_failure_relevant(event: &TeamEvent) -> bool {
236    event.event == "task_escalated"
237        || event.error.is_some()
238        || contains_failure_keyword(&event.event)
239}
240
241fn contains_failure_keyword(value: &str) -> bool {
242    let value = value.to_ascii_lowercase();
243    value.contains("fail") || value.contains("conflict")
244}
245
246fn contains_conflict(value: &str) -> bool {
247    value.to_ascii_lowercase().contains("conflict")
248}
249
250#[cfg_attr(not(test), allow(dead_code))]
251pub fn generate_pattern_notifications(
252    patterns: &[PatternMatch],
253    notification_threshold: u32,
254    severity_threshold: u32,
255) -> Vec<PatternNotification> {
256    let (notification_threshold, severity_threshold) =
257        normalized_thresholds(notification_threshold, severity_threshold);
258
259    patterns
260        .iter()
261        .filter(|pattern| pattern.frequency >= notification_threshold)
262        .map(|pattern| build_pattern_notification(pattern, severity_threshold))
263        .collect()
264}
265
266fn normalized_thresholds(notification_threshold: u32, severity_threshold: u32) -> (u32, u32) {
267    let notification_threshold = if notification_threshold == 0 {
268        DEFAULT_NOTIFICATION_THRESHOLD
269    } else {
270        notification_threshold
271    };
272    let severity_threshold = if severity_threshold == 0 {
273        DEFAULT_SEVERITY_THRESHOLD
274    } else {
275        severity_threshold
276    };
277    (notification_threshold, severity_threshold)
278}
279
280fn build_pattern_notification(
281    pattern: &PatternMatch,
282    severity_threshold: u32,
283) -> PatternNotification {
284    PatternNotification {
285        message: pattern_notification_message(pattern),
286        notify_manager: true,
287        notify_architect: pattern.frequency >= severity_threshold,
288        pattern_type: pattern.pattern_type.clone(),
289        frequency: pattern.frequency,
290    }
291}
292
293fn pattern_signature(pattern: &PatternMatch) -> String {
294    format!(
295        "{}:{}",
296        pattern.pattern_type.as_str(),
297        pattern.affected_entities.join(",")
298    )
299}
300
301fn pattern_notification_message(pattern: &PatternMatch) -> String {
302    let affected = format_affected_entities(&pattern.affected_entities);
303    match pattern.pattern_type {
304        PatternType::RepeatedTestFailure => format!(
305            "Repeated test failures for {affected} ({}) in the recent window. Review the failing runs and stabilize before retrying.",
306            pattern.frequency
307        ),
308        PatternType::EscalationCluster => format!(
309            "Escalations clustered across {affected} ({} total). Review blockers and rebalance or unblock the work.",
310            pattern.frequency
311        ),
312        PatternType::MergeConflictRecurrence => format!(
313            "Merge conflicts keep recurring across {affected} ({} total). Pause merges, rebase branches, and fix the shared hotspot.",
314            pattern.frequency
315        ),
316    }
317}
318
319fn format_affected_entities(entities: &[String]) -> String {
320    if entities.is_empty() {
321        "recent work".to_string()
322    } else {
323        entities.join(", ")
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    fn error_event(event: &str, role: &str, error: &str, ts: u64) -> TeamEvent {
332        TeamEvent {
333            event: event.to_string(),
334            role: Some(role.to_string()),
335            task: None,
336            recipient: None,
337            from: None,
338            to: None,
339            restart: None,
340            restart_count: None,
341            load: None,
342            working_members: None,
343            total_members: None,
344            session_running: None,
345            reason: None,
346            step: None,
347            error: Some(error.to_string()),
348            uptime_secs: None,
349            session_size_bytes: None,
350            ts,
351        }
352    }
353
354    fn conflict_event(
355        event: &str,
356        role: &str,
357        task: &str,
358        error: Option<&str>,
359        ts: u64,
360    ) -> TeamEvent {
361        TeamEvent {
362            event: event.to_string(),
363            role: Some(role.to_string()),
364            task: Some(task.to_string()),
365            recipient: None,
366            from: None,
367            to: None,
368            restart: None,
369            restart_count: None,
370            load: None,
371            working_members: None,
372            total_members: None,
373            session_running: None,
374            reason: None,
375            step: None,
376            error: error.map(str::to_string),
377            uptime_secs: None,
378            session_size_bytes: None,
379            ts,
380        }
381    }
382
383    #[test]
384    fn test_detect_repeated_test_failures() {
385        let mut window = FailureWindow::new(20);
386        window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
387        window.push(&error_event("test_failure", "eng-1", "tests failed", 2));
388        window.push(&error_event("test_failure", "eng-1", "tests failed", 3));
389
390        let patterns = window.detect_failure_patterns();
391
392        assert_eq!(patterns.len(), 1);
393        assert_eq!(patterns[0].pattern_type, PatternType::RepeatedTestFailure);
394        assert_eq!(patterns[0].frequency, 3);
395        assert_eq!(patterns[0].affected_entities, vec!["eng-1".to_string()]);
396    }
397
398    #[test]
399    fn test_detect_escalation_cluster() {
400        let mut window = FailureWindow::new(20);
401        let mut event1 = TeamEvent::task_escalated("eng-1", "101", None);
402        event1.ts = 1;
403        let mut event2 = TeamEvent::task_escalated("eng-2", "102", None);
404        event2.ts = 2;
405        window.push(&event1);
406        window.push(&event2);
407
408        let patterns = window.detect_failure_patterns();
409
410        assert_eq!(patterns.len(), 1);
411        assert_eq!(patterns[0].pattern_type, PatternType::EscalationCluster);
412        assert_eq!(patterns[0].frequency, 2);
413        assert_eq!(
414            patterns[0].affected_entities,
415            vec!["101".to_string(), "102".to_string()]
416        );
417    }
418
419    #[test]
420    fn test_detect_merge_conflict_recurrence() {
421        let mut window = FailureWindow::new(20);
422        window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 1));
423        window.push(&conflict_event(
424            "loop_step_error",
425            "eng-1",
426            "202",
427            Some("rebase conflict on main"),
428            2,
429        ));
430
431        let patterns = window.detect_failure_patterns();
432
433        assert_eq!(patterns.len(), 1);
434        assert_eq!(
435            patterns[0].pattern_type,
436            PatternType::MergeConflictRecurrence
437        );
438        assert_eq!(patterns[0].frequency, 2);
439        assert_eq!(
440            patterns[0].affected_entities,
441            vec!["201".to_string(), "202".to_string()]
442        );
443    }
444
445    #[test]
446    fn test_window_rollover() {
447        let mut window = FailureWindow::new(5);
448        for index in 0..10 {
449            window.push(&conflict_event(
450                "merge_conflict",
451                "eng-1",
452                &format!("task-{index}"),
453                None,
454                index,
455            ));
456        }
457
458        assert_eq!(window.events.len(), 5);
459        assert_eq!(
460            window
461                .events
462                .front()
463                .and_then(|event| event.task.as_deref()),
464            Some("task-5")
465        );
466        assert_eq!(
467            window.events.back().and_then(|event| event.task.as_deref()),
468            Some("task-9")
469        );
470    }
471
472    #[test]
473    fn test_no_patterns_when_below_threshold() {
474        let mut window = FailureWindow::new(20);
475        window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
476        let mut escalation = TeamEvent::task_escalated("eng-1", "101", None);
477        escalation.ts = 2;
478        window.push(&escalation);
479        window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 3));
480
481        assert!(window.detect_failure_patterns().is_empty());
482    }
483
484    #[test]
485    fn test_non_failure_events_ignored() {
486        let mut window = FailureWindow::new(20);
487        let mut routed = TeamEvent::message_routed("manager", "eng-1");
488        routed.ts = 2;
489        window.push(&TeamEvent::daemon_started());
490        window.push(&routed);
491
492        assert!(window.events.is_empty());
493        assert!(window.detect_failure_patterns().is_empty());
494    }
495
496    #[test]
497    fn notification_threshold_triggers() {
498        let notifications = generate_pattern_notifications(
499            &[PatternMatch {
500                pattern_type: PatternType::RepeatedTestFailure,
501                frequency: 3,
502                affected_entities: vec!["eng-1".to_string()],
503                description: "eng-1 failing repeatedly".to_string(),
504            }],
505            3,
506            5,
507        );
508
509        assert_eq!(notifications.len(), 1);
510        assert!(notifications[0].notify_manager);
511        assert_eq!(notifications[0].frequency, 3);
512    }
513
514    #[test]
515    fn notification_below_threshold_is_silent() {
516        let notifications = generate_pattern_notifications(
517            &[PatternMatch {
518                pattern_type: PatternType::RepeatedTestFailure,
519                frequency: 2,
520                affected_entities: vec!["eng-1".to_string()],
521                description: "eng-1 failing repeatedly".to_string(),
522            }],
523            3,
524            5,
525        );
526
527        assert!(notifications.is_empty());
528    }
529
530    #[test]
531    fn severity_routes_to_architect() {
532        let notifications = generate_pattern_notifications(
533            &[PatternMatch {
534                pattern_type: PatternType::MergeConflictRecurrence,
535                frequency: 5,
536                affected_entities: vec!["201".to_string(), "202".to_string()],
537                description: "conflicts recurring".to_string(),
538            }],
539            3,
540            5,
541        );
542
543        assert_eq!(notifications.len(), 1);
544        assert!(notifications[0].notify_architect);
545    }
546
547    #[test]
548    fn below_severity_routes_to_manager_only() {
549        let notifications = generate_pattern_notifications(
550            &[PatternMatch {
551                pattern_type: PatternType::EscalationCluster,
552                frequency: 4,
553                affected_entities: vec!["101".to_string(), "102".to_string()],
554                description: "escalations piling up".to_string(),
555            }],
556            3,
557            5,
558        );
559
560        assert_eq!(notifications.len(), 1);
561        assert!(notifications[0].notify_manager);
562        assert!(!notifications[0].notify_architect);
563    }
564
565    #[test]
566    fn notification_message_is_actionable() {
567        let notifications = generate_pattern_notifications(
568            &[PatternMatch {
569                pattern_type: PatternType::MergeConflictRecurrence,
570                frequency: 4,
571                affected_entities: vec!["201".to_string()],
572                description: "conflicts recurring".to_string(),
573            }],
574            3,
575            5,
576        );
577
578        let message = notifications[0].message.to_ascii_lowercase();
579        assert!(message.contains("pause"));
580        assert!(message.contains("rebase"));
581        assert!(message.contains("fix"));
582    }
583
584    #[test]
585    fn tracker_suppresses_repeated_notifications_without_new_failures() {
586        let mut tracker = FailureTracker::new(20);
587        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
588        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
589        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
590
591        let initial = tracker.pattern_notifications(3, 5);
592        let repeated = tracker.pattern_notifications(3, 5);
593
594        assert_eq!(initial.len(), 1);
595        assert!(repeated.is_empty());
596    }
597
598    #[test]
599    fn tracker_reemits_when_pattern_frequency_increases() {
600        let mut tracker = FailureTracker::new(20);
601        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
602        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
603        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
604        assert_eq!(tracker.pattern_notifications(3, 5).len(), 1);
605
606        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 4));
607
608        let notifications = tracker.pattern_notifications(3, 5);
609
610        assert_eq!(notifications.len(), 1);
611        assert_eq!(notifications[0].frequency, 4);
612    }
613}