Skip to main content

batty_cli/team/
failure_patterns.rs

1//! Rolling failure-signature detection over recent team events.
2
3use std::collections::{HashMap, VecDeque};
4
5use super::events::TeamEvent;
6
7const DEFAULT_WINDOW_SIZE: usize = 20;
8const DEFAULT_NOTIFICATION_THRESHOLD: u32 = 3;
9const DEFAULT_SEVERITY_THRESHOLD: u32 = 5;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum PatternType {
13    RepeatedTestFailure,
14    EscalationCluster,
15    MergeConflictRecurrence,
16}
17
18impl PatternType {
19    pub fn as_str(&self) -> &'static str {
20        match self {
21            PatternType::RepeatedTestFailure => "repeated_test_failure",
22            PatternType::EscalationCluster => "escalation_cluster",
23            PatternType::MergeConflictRecurrence => "merge_conflict_recurrence",
24        }
25    }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct PatternMatch {
30    pub pattern_type: PatternType,
31    pub frequency: u32,
32    pub affected_entities: Vec<String>,
33    pub description: String,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct PatternNotification {
38    pub message: String,
39    pub notify_manager: bool,
40    pub notify_architect: bool,
41    pub pattern_type: PatternType,
42    pub frequency: u32,
43}
44
45pub struct FailureWindow {
46    events: VecDeque<FailureEvent>,
47    window_size: usize,
48}
49
50pub struct FailureTracker {
51    window: FailureWindow,
52    last_notifications: HashMap<String, u32>,
53}
54
55#[derive(Debug, Clone)]
56struct FailureEvent {
57    pub event_type: String,
58    pub role: Option<String>,
59    pub task: Option<String>,
60    pub error: Option<String>,
61    pub ts: u64,
62}
63
64impl FailureWindow {
65    pub fn new(window_size: usize) -> Self {
66        Self {
67            events: VecDeque::new(),
68            window_size: if window_size == 0 {
69                DEFAULT_WINDOW_SIZE
70            } else {
71                window_size
72            },
73        }
74    }
75
76    pub fn push(&mut self, event: &TeamEvent) {
77        if !is_failure_relevant(event) {
78            return;
79        }
80
81        self.events.push_back(FailureEvent {
82            event_type: event.event.clone(),
83            role: event.role.clone(),
84            task: event.task.clone(),
85            error: event.error.clone(),
86            ts: event.ts,
87        });
88
89        while self.events.len() > self.window_size {
90            self.events.pop_front();
91        }
92    }
93
94    pub fn detect_failure_patterns(&self) -> Vec<PatternMatch> {
95        let mut patterns = Vec::new();
96
97        let mut error_counts: HashMap<String, u32> = HashMap::new();
98        for event in &self.events {
99            if event.error.is_some() {
100                if let Some(role) = event.role.as_deref() {
101                    *error_counts.entry(role.to_string()).or_insert(0) += 1;
102                }
103            }
104        }
105        for (role, frequency) in error_counts {
106            if frequency >= 2 {
107                patterns.push(PatternMatch {
108                    pattern_type: PatternType::RepeatedTestFailure,
109                    frequency,
110                    affected_entities: vec![role.clone()],
111                    description: format!(
112                        "{role} hit {frequency} error events in the current failure window"
113                    ),
114                });
115            }
116        }
117
118        let escalation_events: Vec<&FailureEvent> = self
119            .events
120            .iter()
121            .filter(|event| event.event_type == "task_escalated")
122            .collect();
123        if escalation_events.len() >= 2 {
124            let mut affected_entities: Vec<String> = escalation_events
125                .iter()
126                .filter_map(|event| event.task.clone())
127                .collect();
128            affected_entities.sort();
129            affected_entities.dedup();
130            patterns.push(PatternMatch {
131                pattern_type: PatternType::EscalationCluster,
132                frequency: escalation_events.len() as u32,
133                affected_entities,
134                description: format!(
135                    "{} escalation events detected in the current failure window",
136                    escalation_events.len()
137                ),
138            });
139        }
140
141        let conflict_events: Vec<&FailureEvent> = self
142            .events
143            .iter()
144            .filter(|event| {
145                contains_conflict(&event.event_type)
146                    || event.error.as_deref().is_some_and(contains_conflict)
147            })
148            .collect();
149        if conflict_events.len() >= 2 {
150            let mut affected_entities: Vec<String> = conflict_events
151                .iter()
152                .filter_map(|event| {
153                    event
154                        .task
155                        .clone()
156                        .or_else(|| event.role.clone())
157                        .or_else(|| Some(event.ts.to_string()))
158                })
159                .collect();
160            affected_entities.sort();
161            affected_entities.dedup();
162            patterns.push(PatternMatch {
163                pattern_type: PatternType::MergeConflictRecurrence,
164                frequency: conflict_events.len() as u32,
165                affected_entities,
166                description: format!(
167                    "{} conflict-related events detected in the current failure window",
168                    conflict_events.len()
169                ),
170            });
171        }
172
173        patterns.sort_by(|left, right| {
174            right
175                .frequency
176                .cmp(&left.frequency)
177                .then_with(|| {
178                    pattern_sort_key(&left.pattern_type).cmp(&pattern_sort_key(&right.pattern_type))
179                })
180                .then_with(|| left.description.cmp(&right.description))
181        });
182        patterns
183    }
184}
185
186impl FailureTracker {
187    pub fn new(window_size: usize) -> Self {
188        Self {
189            window: FailureWindow::new(window_size),
190            last_notifications: HashMap::new(),
191        }
192    }
193
194    pub fn push(&mut self, event: &TeamEvent) {
195        self.window.push(event);
196    }
197
198    pub fn pattern_notifications(
199        &mut self,
200        notification_threshold: u32,
201        severity_threshold: u32,
202    ) -> Vec<PatternNotification> {
203        let (notification_threshold, severity_threshold) =
204            normalized_thresholds(notification_threshold, severity_threshold);
205
206        self.window
207            .detect_failure_patterns()
208            .into_iter()
209            .filter(|pattern| pattern.frequency >= notification_threshold)
210            .filter_map(|pattern| {
211                let signature = pattern_signature(&pattern);
212                let last_frequency = self
213                    .last_notifications
214                    .get(&signature)
215                    .copied()
216                    .unwrap_or(0);
217                if pattern.frequency <= last_frequency {
218                    return None;
219                }
220                self.last_notifications.insert(signature, pattern.frequency);
221                Some(build_pattern_notification(&pattern, severity_threshold))
222            })
223            .collect()
224    }
225}
226
227fn pattern_sort_key(pattern_type: &PatternType) -> u8 {
228    match pattern_type {
229        PatternType::RepeatedTestFailure => 0,
230        PatternType::EscalationCluster => 1,
231        PatternType::MergeConflictRecurrence => 2,
232    }
233}
234
235fn is_failure_relevant(event: &TeamEvent) -> bool {
236    event.event == "task_escalated"
237        || event.error.is_some()
238        || contains_failure_keyword(&event.event)
239}
240
241fn contains_failure_keyword(value: &str) -> bool {
242    let value = value.to_ascii_lowercase();
243    value.contains("fail") || value.contains("conflict")
244}
245
246fn contains_conflict(value: &str) -> bool {
247    value.to_ascii_lowercase().contains("conflict")
248}
249
250#[cfg_attr(not(test), allow(dead_code))]
251pub fn generate_pattern_notifications(
252    patterns: &[PatternMatch],
253    notification_threshold: u32,
254    severity_threshold: u32,
255) -> Vec<PatternNotification> {
256    let (notification_threshold, severity_threshold) =
257        normalized_thresholds(notification_threshold, severity_threshold);
258
259    patterns
260        .iter()
261        .filter(|pattern| pattern.frequency >= notification_threshold)
262        .map(|pattern| build_pattern_notification(pattern, severity_threshold))
263        .collect()
264}
265
266fn normalized_thresholds(notification_threshold: u32, severity_threshold: u32) -> (u32, u32) {
267    let notification_threshold = if notification_threshold == 0 {
268        DEFAULT_NOTIFICATION_THRESHOLD
269    } else {
270        notification_threshold
271    };
272    let severity_threshold = if severity_threshold == 0 {
273        DEFAULT_SEVERITY_THRESHOLD
274    } else {
275        severity_threshold
276    };
277    (notification_threshold, severity_threshold)
278}
279
280fn build_pattern_notification(
281    pattern: &PatternMatch,
282    severity_threshold: u32,
283) -> PatternNotification {
284    PatternNotification {
285        message: pattern_notification_message(pattern),
286        notify_manager: true,
287        notify_architect: pattern.frequency >= severity_threshold,
288        pattern_type: pattern.pattern_type.clone(),
289        frequency: pattern.frequency,
290    }
291}
292
293fn pattern_signature(pattern: &PatternMatch) -> String {
294    format!(
295        "{}:{}",
296        pattern.pattern_type.as_str(),
297        pattern.affected_entities.join(",")
298    )
299}
300
301fn pattern_notification_message(pattern: &PatternMatch) -> String {
302    let affected = format_affected_entities(&pattern.affected_entities);
303    match pattern.pattern_type {
304        PatternType::RepeatedTestFailure => format!(
305            "Repeated test failures for {affected} ({}) in the recent window. Review the failing runs and stabilize before retrying.",
306            pattern.frequency
307        ),
308        PatternType::EscalationCluster => format!(
309            "Escalations clustered across {affected} ({} total). Review blockers and rebalance or unblock the work.",
310            pattern.frequency
311        ),
312        PatternType::MergeConflictRecurrence => format!(
313            "Merge conflicts keep recurring across {affected} ({} total). Pause merges, rebase branches, and fix the shared hotspot.",
314            pattern.frequency
315        ),
316    }
317}
318
319fn format_affected_entities(entities: &[String]) -> String {
320    if entities.is_empty() {
321        "recent work".to_string()
322    } else {
323        entities.join(", ")
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    fn error_event(event: &str, role: &str, error: &str, ts: u64) -> TeamEvent {
332        TeamEvent {
333            event: event.to_string(),
334            role: Some(role.to_string()),
335            task: None,
336            recipient: None,
337            from: None,
338            to: None,
339            restart: None,
340            restart_count: None,
341            load: None,
342            working_members: None,
343            total_members: None,
344            session_running: None,
345            reason: None,
346            step: None,
347            error: Some(error.to_string()),
348            uptime_secs: None,
349            session_size_bytes: None,
350            output_bytes: None,
351            ts,
352        }
353    }
354
355    fn conflict_event(
356        event: &str,
357        role: &str,
358        task: &str,
359        error: Option<&str>,
360        ts: u64,
361    ) -> TeamEvent {
362        TeamEvent {
363            event: event.to_string(),
364            role: Some(role.to_string()),
365            task: Some(task.to_string()),
366            recipient: None,
367            from: None,
368            to: None,
369            restart: None,
370            restart_count: None,
371            load: None,
372            working_members: None,
373            total_members: None,
374            session_running: None,
375            reason: None,
376            step: None,
377            error: error.map(str::to_string),
378            uptime_secs: None,
379            session_size_bytes: None,
380            output_bytes: None,
381            ts,
382        }
383    }
384
385    #[test]
386    fn test_detect_repeated_test_failures() {
387        let mut window = FailureWindow::new(20);
388        window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
389        window.push(&error_event("test_failure", "eng-1", "tests failed", 2));
390        window.push(&error_event("test_failure", "eng-1", "tests failed", 3));
391
392        let patterns = window.detect_failure_patterns();
393
394        assert_eq!(patterns.len(), 1);
395        assert_eq!(patterns[0].pattern_type, PatternType::RepeatedTestFailure);
396        assert_eq!(patterns[0].frequency, 3);
397        assert_eq!(patterns[0].affected_entities, vec!["eng-1".to_string()]);
398    }
399
400    #[test]
401    fn test_detect_escalation_cluster() {
402        let mut window = FailureWindow::new(20);
403        let mut event1 = TeamEvent::task_escalated("eng-1", "101", None);
404        event1.ts = 1;
405        let mut event2 = TeamEvent::task_escalated("eng-2", "102", None);
406        event2.ts = 2;
407        window.push(&event1);
408        window.push(&event2);
409
410        let patterns = window.detect_failure_patterns();
411
412        assert_eq!(patterns.len(), 1);
413        assert_eq!(patterns[0].pattern_type, PatternType::EscalationCluster);
414        assert_eq!(patterns[0].frequency, 2);
415        assert_eq!(
416            patterns[0].affected_entities,
417            vec!["101".to_string(), "102".to_string()]
418        );
419    }
420
421    #[test]
422    fn test_detect_merge_conflict_recurrence() {
423        let mut window = FailureWindow::new(20);
424        window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 1));
425        window.push(&conflict_event(
426            "loop_step_error",
427            "eng-1",
428            "202",
429            Some("rebase conflict on main"),
430            2,
431        ));
432
433        let patterns = window.detect_failure_patterns();
434
435        assert_eq!(patterns.len(), 1);
436        assert_eq!(
437            patterns[0].pattern_type,
438            PatternType::MergeConflictRecurrence
439        );
440        assert_eq!(patterns[0].frequency, 2);
441        assert_eq!(
442            patterns[0].affected_entities,
443            vec!["201".to_string(), "202".to_string()]
444        );
445    }
446
447    #[test]
448    fn test_window_rollover() {
449        let mut window = FailureWindow::new(5);
450        for index in 0..10 {
451            window.push(&conflict_event(
452                "merge_conflict",
453                "eng-1",
454                &format!("task-{index}"),
455                None,
456                index,
457            ));
458        }
459
460        assert_eq!(window.events.len(), 5);
461        assert_eq!(
462            window
463                .events
464                .front()
465                .and_then(|event| event.task.as_deref()),
466            Some("task-5")
467        );
468        assert_eq!(
469            window.events.back().and_then(|event| event.task.as_deref()),
470            Some("task-9")
471        );
472    }
473
474    #[test]
475    fn test_no_patterns_when_below_threshold() {
476        let mut window = FailureWindow::new(20);
477        window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
478        let mut escalation = TeamEvent::task_escalated("eng-1", "101", None);
479        escalation.ts = 2;
480        window.push(&escalation);
481        window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 3));
482
483        assert!(window.detect_failure_patterns().is_empty());
484    }
485
486    #[test]
487    fn test_non_failure_events_ignored() {
488        let mut window = FailureWindow::new(20);
489        let mut routed = TeamEvent::message_routed("manager", "eng-1");
490        routed.ts = 2;
491        window.push(&TeamEvent::daemon_started());
492        window.push(&routed);
493
494        assert!(window.events.is_empty());
495        assert!(window.detect_failure_patterns().is_empty());
496    }
497
498    #[test]
499    fn notification_threshold_triggers() {
500        let notifications = generate_pattern_notifications(
501            &[PatternMatch {
502                pattern_type: PatternType::RepeatedTestFailure,
503                frequency: 3,
504                affected_entities: vec!["eng-1".to_string()],
505                description: "eng-1 failing repeatedly".to_string(),
506            }],
507            3,
508            5,
509        );
510
511        assert_eq!(notifications.len(), 1);
512        assert!(notifications[0].notify_manager);
513        assert_eq!(notifications[0].frequency, 3);
514    }
515
516    #[test]
517    fn notification_below_threshold_is_silent() {
518        let notifications = generate_pattern_notifications(
519            &[PatternMatch {
520                pattern_type: PatternType::RepeatedTestFailure,
521                frequency: 2,
522                affected_entities: vec!["eng-1".to_string()],
523                description: "eng-1 failing repeatedly".to_string(),
524            }],
525            3,
526            5,
527        );
528
529        assert!(notifications.is_empty());
530    }
531
532    #[test]
533    fn severity_routes_to_architect() {
534        let notifications = generate_pattern_notifications(
535            &[PatternMatch {
536                pattern_type: PatternType::MergeConflictRecurrence,
537                frequency: 5,
538                affected_entities: vec!["201".to_string(), "202".to_string()],
539                description: "conflicts recurring".to_string(),
540            }],
541            3,
542            5,
543        );
544
545        assert_eq!(notifications.len(), 1);
546        assert!(notifications[0].notify_architect);
547    }
548
549    #[test]
550    fn below_severity_routes_to_manager_only() {
551        let notifications = generate_pattern_notifications(
552            &[PatternMatch {
553                pattern_type: PatternType::EscalationCluster,
554                frequency: 4,
555                affected_entities: vec!["101".to_string(), "102".to_string()],
556                description: "escalations piling up".to_string(),
557            }],
558            3,
559            5,
560        );
561
562        assert_eq!(notifications.len(), 1);
563        assert!(notifications[0].notify_manager);
564        assert!(!notifications[0].notify_architect);
565    }
566
567    #[test]
568    fn notification_message_is_actionable() {
569        let notifications = generate_pattern_notifications(
570            &[PatternMatch {
571                pattern_type: PatternType::MergeConflictRecurrence,
572                frequency: 4,
573                affected_entities: vec!["201".to_string()],
574                description: "conflicts recurring".to_string(),
575            }],
576            3,
577            5,
578        );
579
580        let message = notifications[0].message.to_ascii_lowercase();
581        assert!(message.contains("pause"));
582        assert!(message.contains("rebase"));
583        assert!(message.contains("fix"));
584    }
585
586    #[test]
587    fn tracker_suppresses_repeated_notifications_without_new_failures() {
588        let mut tracker = FailureTracker::new(20);
589        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
590        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
591        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
592
593        let initial = tracker.pattern_notifications(3, 5);
594        let repeated = tracker.pattern_notifications(3, 5);
595
596        assert_eq!(initial.len(), 1);
597        assert!(repeated.is_empty());
598    }
599
600    #[test]
601    fn tracker_reemits_when_pattern_frequency_increases() {
602        let mut tracker = FailureTracker::new(20);
603        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
604        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
605        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
606        assert_eq!(tracker.pattern_notifications(3, 5).len(), 1);
607
608        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 4));
609
610        let notifications = tracker.pattern_notifications(3, 5);
611
612        assert_eq!(notifications.len(), 1);
613        assert_eq!(notifications[0].frequency, 4);
614    }
615}