Skip to main content

batty_cli/team/
failure_patterns.rs

1//! Rolling failure-signature detection over recent team events.
2
3use std::collections::{HashMap, VecDeque};
4
5use super::events::TeamEvent;
6
7const DEFAULT_WINDOW_SIZE: usize = 20;
8const DEFAULT_NOTIFICATION_THRESHOLD: u32 = 3;
9const DEFAULT_SEVERITY_THRESHOLD: u32 = 5;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum PatternType {
13    RepeatedTestFailure,
14    EscalationCluster,
15    MergeConflictRecurrence,
16}
17
18impl PatternType {
19    pub fn as_str(&self) -> &'static str {
20        match self {
21            PatternType::RepeatedTestFailure => "repeated_test_failure",
22            PatternType::EscalationCluster => "escalation_cluster",
23            PatternType::MergeConflictRecurrence => "merge_conflict_recurrence",
24        }
25    }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct PatternMatch {
30    pub pattern_type: PatternType,
31    pub frequency: u32,
32    pub affected_entities: Vec<String>,
33    pub description: String,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct PatternNotification {
38    pub message: String,
39    pub notify_manager: bool,
40    pub notify_architect: bool,
41    pub pattern_type: PatternType,
42    pub frequency: u32,
43}
44
45pub struct FailureWindow {
46    events: VecDeque<FailureEvent>,
47    window_size: usize,
48}
49
50pub struct FailureTracker {
51    window: FailureWindow,
52    last_notifications: HashMap<String, u32>,
53}
54
55#[derive(Debug, Clone)]
56struct FailureEvent {
57    pub event_type: String,
58    pub role: Option<String>,
59    pub task: Option<String>,
60    pub error: Option<String>,
61    pub ts: u64,
62}
63
64impl FailureWindow {
65    pub fn new(window_size: usize) -> Self {
66        Self {
67            events: VecDeque::new(),
68            window_size: if window_size == 0 {
69                DEFAULT_WINDOW_SIZE
70            } else {
71                window_size
72            },
73        }
74    }
75
76    pub fn push(&mut self, event: &TeamEvent) {
77        if !is_failure_relevant(event) {
78            return;
79        }
80
81        self.events.push_back(FailureEvent {
82            event_type: event.event.clone(),
83            role: event.role.clone(),
84            task: event.task.clone(),
85            error: event.error.clone(),
86            ts: event.ts,
87        });
88
89        while self.events.len() > self.window_size {
90            self.events.pop_front();
91        }
92    }
93
94    pub fn detect_failure_patterns(&self) -> Vec<PatternMatch> {
95        let mut patterns = Vec::new();
96
97        let mut error_counts: HashMap<String, u32> = HashMap::new();
98        for event in &self.events {
99            if event.error.is_some() {
100                if let Some(role) = event.role.as_deref() {
101                    *error_counts.entry(role.to_string()).or_insert(0) += 1;
102                }
103            }
104        }
105        for (role, frequency) in error_counts {
106            if frequency >= 2 {
107                patterns.push(PatternMatch {
108                    pattern_type: PatternType::RepeatedTestFailure,
109                    frequency,
110                    affected_entities: vec![role.clone()],
111                    description: format!(
112                        "{role} hit {frequency} error events in the current failure window"
113                    ),
114                });
115            }
116        }
117
118        let escalation_events: Vec<&FailureEvent> = self
119            .events
120            .iter()
121            .filter(|event| event.event_type == "task_escalated")
122            .collect();
123        if escalation_events.len() >= 2 {
124            let mut affected_entities: Vec<String> = escalation_events
125                .iter()
126                .filter_map(|event| event.task.clone())
127                .collect();
128            affected_entities.sort();
129            affected_entities.dedup();
130            patterns.push(PatternMatch {
131                pattern_type: PatternType::EscalationCluster,
132                frequency: escalation_events.len() as u32,
133                affected_entities,
134                description: format!(
135                    "{} escalation events detected in the current failure window",
136                    escalation_events.len()
137                ),
138            });
139        }
140
141        let conflict_events: Vec<&FailureEvent> = self
142            .events
143            .iter()
144            .filter(|event| {
145                contains_conflict(&event.event_type)
146                    || event.error.as_deref().is_some_and(contains_conflict)
147            })
148            .collect();
149        if conflict_events.len() >= 2 {
150            let mut affected_entities: Vec<String> = conflict_events
151                .iter()
152                .filter_map(|event| {
153                    event
154                        .task
155                        .clone()
156                        .or_else(|| event.role.clone())
157                        .or_else(|| Some(event.ts.to_string()))
158                })
159                .collect();
160            affected_entities.sort();
161            affected_entities.dedup();
162            patterns.push(PatternMatch {
163                pattern_type: PatternType::MergeConflictRecurrence,
164                frequency: conflict_events.len() as u32,
165                affected_entities,
166                description: format!(
167                    "{} conflict-related events detected in the current failure window",
168                    conflict_events.len()
169                ),
170            });
171        }
172
173        patterns.sort_by(|left, right| {
174            right
175                .frequency
176                .cmp(&left.frequency)
177                .then_with(|| {
178                    pattern_sort_key(&left.pattern_type).cmp(&pattern_sort_key(&right.pattern_type))
179                })
180                .then_with(|| left.description.cmp(&right.description))
181        });
182        patterns
183    }
184}
185
186impl FailureTracker {
187    pub fn new(window_size: usize) -> Self {
188        Self {
189            window: FailureWindow::new(window_size),
190            last_notifications: HashMap::new(),
191        }
192    }
193
194    pub fn push(&mut self, event: &TeamEvent) {
195        self.window.push(event);
196    }
197
198    pub fn pattern_notifications(
199        &mut self,
200        notification_threshold: u32,
201        severity_threshold: u32,
202    ) -> Vec<PatternNotification> {
203        let (notification_threshold, severity_threshold) =
204            normalized_thresholds(notification_threshold, severity_threshold);
205
206        self.window
207            .detect_failure_patterns()
208            .into_iter()
209            .filter(|pattern| pattern.frequency >= notification_threshold)
210            .filter_map(|pattern| {
211                let signature = pattern_signature(&pattern);
212                let last_frequency = self
213                    .last_notifications
214                    .get(&signature)
215                    .copied()
216                    .unwrap_or(0);
217                if pattern.frequency <= last_frequency {
218                    return None;
219                }
220                self.last_notifications.insert(signature, pattern.frequency);
221                Some(build_pattern_notification(&pattern, severity_threshold))
222            })
223            .collect()
224    }
225}
226
227fn pattern_sort_key(pattern_type: &PatternType) -> u8 {
228    match pattern_type {
229        PatternType::RepeatedTestFailure => 0,
230        PatternType::EscalationCluster => 1,
231        PatternType::MergeConflictRecurrence => 2,
232    }
233}
234
235fn is_failure_relevant(event: &TeamEvent) -> bool {
236    event.event == "task_escalated"
237        || event.error.is_some()
238        || contains_failure_keyword(&event.event)
239}
240
241fn contains_failure_keyword(value: &str) -> bool {
242    let value = value.to_ascii_lowercase();
243    value.contains("fail") || value.contains("conflict")
244}
245
246fn contains_conflict(value: &str) -> bool {
247    value.to_ascii_lowercase().contains("conflict")
248}
249
250#[cfg_attr(not(test), allow(dead_code))]
251pub fn generate_pattern_notifications(
252    patterns: &[PatternMatch],
253    notification_threshold: u32,
254    severity_threshold: u32,
255) -> Vec<PatternNotification> {
256    let (notification_threshold, severity_threshold) =
257        normalized_thresholds(notification_threshold, severity_threshold);
258
259    patterns
260        .iter()
261        .filter(|pattern| pattern.frequency >= notification_threshold)
262        .map(|pattern| build_pattern_notification(pattern, severity_threshold))
263        .collect()
264}
265
266fn normalized_thresholds(notification_threshold: u32, severity_threshold: u32) -> (u32, u32) {
267    let notification_threshold = if notification_threshold == 0 {
268        DEFAULT_NOTIFICATION_THRESHOLD
269    } else {
270        notification_threshold
271    };
272    let severity_threshold = if severity_threshold == 0 {
273        DEFAULT_SEVERITY_THRESHOLD
274    } else {
275        severity_threshold
276    };
277    (notification_threshold, severity_threshold)
278}
279
280fn build_pattern_notification(
281    pattern: &PatternMatch,
282    severity_threshold: u32,
283) -> PatternNotification {
284    PatternNotification {
285        message: pattern_notification_message(pattern),
286        notify_manager: true,
287        notify_architect: pattern.frequency >= severity_threshold,
288        pattern_type: pattern.pattern_type.clone(),
289        frequency: pattern.frequency,
290    }
291}
292
293fn pattern_signature(pattern: &PatternMatch) -> String {
294    format!(
295        "{}:{}",
296        pattern.pattern_type.as_str(),
297        pattern.affected_entities.join(",")
298    )
299}
300
301fn pattern_notification_message(pattern: &PatternMatch) -> String {
302    let affected = format_affected_entities(&pattern.affected_entities);
303    match pattern.pattern_type {
304        PatternType::RepeatedTestFailure => format!(
305            "Repeated test failures for {affected} ({}) in the recent window. Review the failing runs and stabilize before retrying.",
306            pattern.frequency
307        ),
308        PatternType::EscalationCluster => format!(
309            "Escalations clustered across {affected} ({} total). Review blockers and rebalance or unblock the work.",
310            pattern.frequency
311        ),
312        PatternType::MergeConflictRecurrence => format!(
313            "Merge conflicts keep recurring across {affected} ({} total). Pause merges, rebase branches, and fix the shared hotspot.",
314            pattern.frequency
315        ),
316    }
317}
318
319fn format_affected_entities(entities: &[String]) -> String {
320    if entities.is_empty() {
321        "recent work".to_string()
322    } else {
323        entities.join(", ")
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    fn error_event(event: &str, role: &str, error: &str, ts: u64) -> TeamEvent {
332        TeamEvent {
333            event: event.to_string(),
334            role: Some(role.to_string()),
335            task: None,
336            recipient: None,
337            from: None,
338            to: None,
339            restart: None,
340            restart_count: None,
341            load: None,
342            working_members: None,
343            total_members: None,
344            session_running: None,
345            reason: None,
346            step: None,
347            error: Some(error.to_string()),
348            uptime_secs: None,
349            session_size_bytes: None,
350            output_bytes: None,
351            filename: None,
352            content_hash: None,
353            ts,
354        }
355    }
356
357    fn conflict_event(
358        event: &str,
359        role: &str,
360        task: &str,
361        error: Option<&str>,
362        ts: u64,
363    ) -> TeamEvent {
364        TeamEvent {
365            event: event.to_string(),
366            role: Some(role.to_string()),
367            task: Some(task.to_string()),
368            recipient: None,
369            from: None,
370            to: None,
371            restart: None,
372            restart_count: None,
373            load: None,
374            working_members: None,
375            total_members: None,
376            session_running: None,
377            reason: None,
378            step: None,
379            error: error.map(str::to_string),
380            uptime_secs: None,
381            session_size_bytes: None,
382            output_bytes: None,
383            filename: None,
384            content_hash: None,
385            ts,
386        }
387    }
388
389    #[test]
390    fn test_detect_repeated_test_failures() {
391        let mut window = FailureWindow::new(20);
392        window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
393        window.push(&error_event("test_failure", "eng-1", "tests failed", 2));
394        window.push(&error_event("test_failure", "eng-1", "tests failed", 3));
395
396        let patterns = window.detect_failure_patterns();
397
398        assert_eq!(patterns.len(), 1);
399        assert_eq!(patterns[0].pattern_type, PatternType::RepeatedTestFailure);
400        assert_eq!(patterns[0].frequency, 3);
401        assert_eq!(patterns[0].affected_entities, vec!["eng-1".to_string()]);
402    }
403
404    #[test]
405    fn test_detect_escalation_cluster() {
406        let mut window = FailureWindow::new(20);
407        let mut event1 = TeamEvent::task_escalated("eng-1", "101", None);
408        event1.ts = 1;
409        let mut event2 = TeamEvent::task_escalated("eng-2", "102", None);
410        event2.ts = 2;
411        window.push(&event1);
412        window.push(&event2);
413
414        let patterns = window.detect_failure_patterns();
415
416        assert_eq!(patterns.len(), 1);
417        assert_eq!(patterns[0].pattern_type, PatternType::EscalationCluster);
418        assert_eq!(patterns[0].frequency, 2);
419        assert_eq!(
420            patterns[0].affected_entities,
421            vec!["101".to_string(), "102".to_string()]
422        );
423    }
424
425    #[test]
426    fn test_detect_merge_conflict_recurrence() {
427        let mut window = FailureWindow::new(20);
428        window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 1));
429        window.push(&conflict_event(
430            "loop_step_error",
431            "eng-1",
432            "202",
433            Some("rebase conflict on main"),
434            2,
435        ));
436
437        let patterns = window.detect_failure_patterns();
438
439        assert_eq!(patterns.len(), 1);
440        assert_eq!(
441            patterns[0].pattern_type,
442            PatternType::MergeConflictRecurrence
443        );
444        assert_eq!(patterns[0].frequency, 2);
445        assert_eq!(
446            patterns[0].affected_entities,
447            vec!["201".to_string(), "202".to_string()]
448        );
449    }
450
451    #[test]
452    fn test_window_rollover() {
453        let mut window = FailureWindow::new(5);
454        for index in 0..10 {
455            window.push(&conflict_event(
456                "merge_conflict",
457                "eng-1",
458                &format!("task-{index}"),
459                None,
460                index,
461            ));
462        }
463
464        assert_eq!(window.events.len(), 5);
465        assert_eq!(
466            window
467                .events
468                .front()
469                .and_then(|event| event.task.as_deref()),
470            Some("task-5")
471        );
472        assert_eq!(
473            window.events.back().and_then(|event| event.task.as_deref()),
474            Some("task-9")
475        );
476    }
477
478    #[test]
479    fn test_no_patterns_when_below_threshold() {
480        let mut window = FailureWindow::new(20);
481        window.push(&error_event("test_failure", "eng-1", "tests failed", 1));
482        let mut escalation = TeamEvent::task_escalated("eng-1", "101", None);
483        escalation.ts = 2;
484        window.push(&escalation);
485        window.push(&conflict_event("merge_conflict", "eng-1", "201", None, 3));
486
487        assert!(window.detect_failure_patterns().is_empty());
488    }
489
490    #[test]
491    fn test_non_failure_events_ignored() {
492        let mut window = FailureWindow::new(20);
493        let mut routed = TeamEvent::message_routed("manager", "eng-1");
494        routed.ts = 2;
495        window.push(&TeamEvent::daemon_started());
496        window.push(&routed);
497
498        assert!(window.events.is_empty());
499        assert!(window.detect_failure_patterns().is_empty());
500    }
501
502    #[test]
503    fn notification_threshold_triggers() {
504        let notifications = generate_pattern_notifications(
505            &[PatternMatch {
506                pattern_type: PatternType::RepeatedTestFailure,
507                frequency: 3,
508                affected_entities: vec!["eng-1".to_string()],
509                description: "eng-1 failing repeatedly".to_string(),
510            }],
511            3,
512            5,
513        );
514
515        assert_eq!(notifications.len(), 1);
516        assert!(notifications[0].notify_manager);
517        assert_eq!(notifications[0].frequency, 3);
518    }
519
520    #[test]
521    fn notification_below_threshold_is_silent() {
522        let notifications = generate_pattern_notifications(
523            &[PatternMatch {
524                pattern_type: PatternType::RepeatedTestFailure,
525                frequency: 2,
526                affected_entities: vec!["eng-1".to_string()],
527                description: "eng-1 failing repeatedly".to_string(),
528            }],
529            3,
530            5,
531        );
532
533        assert!(notifications.is_empty());
534    }
535
536    #[test]
537    fn severity_routes_to_architect() {
538        let notifications = generate_pattern_notifications(
539            &[PatternMatch {
540                pattern_type: PatternType::MergeConflictRecurrence,
541                frequency: 5,
542                affected_entities: vec!["201".to_string(), "202".to_string()],
543                description: "conflicts recurring".to_string(),
544            }],
545            3,
546            5,
547        );
548
549        assert_eq!(notifications.len(), 1);
550        assert!(notifications[0].notify_architect);
551    }
552
553    #[test]
554    fn below_severity_routes_to_manager_only() {
555        let notifications = generate_pattern_notifications(
556            &[PatternMatch {
557                pattern_type: PatternType::EscalationCluster,
558                frequency: 4,
559                affected_entities: vec!["101".to_string(), "102".to_string()],
560                description: "escalations piling up".to_string(),
561            }],
562            3,
563            5,
564        );
565
566        assert_eq!(notifications.len(), 1);
567        assert!(notifications[0].notify_manager);
568        assert!(!notifications[0].notify_architect);
569    }
570
571    #[test]
572    fn notification_message_is_actionable() {
573        let notifications = generate_pattern_notifications(
574            &[PatternMatch {
575                pattern_type: PatternType::MergeConflictRecurrence,
576                frequency: 4,
577                affected_entities: vec!["201".to_string()],
578                description: "conflicts recurring".to_string(),
579            }],
580            3,
581            5,
582        );
583
584        let message = notifications[0].message.to_ascii_lowercase();
585        assert!(message.contains("pause"));
586        assert!(message.contains("rebase"));
587        assert!(message.contains("fix"));
588    }
589
590    #[test]
591    fn tracker_suppresses_repeated_notifications_without_new_failures() {
592        let mut tracker = FailureTracker::new(20);
593        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
594        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
595        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
596
597        let initial = tracker.pattern_notifications(3, 5);
598        let repeated = tracker.pattern_notifications(3, 5);
599
600        assert_eq!(initial.len(), 1);
601        assert!(repeated.is_empty());
602    }
603
604    #[test]
605    fn tracker_reemits_when_pattern_frequency_increases() {
606        let mut tracker = FailureTracker::new(20);
607        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 1));
608        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 2));
609        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 3));
610        assert_eq!(tracker.pattern_notifications(3, 5).len(), 1);
611
612        tracker.push(&error_event("test_failure", "eng-1", "tests failed", 4));
613
614        let notifications = tracker.pattern_notifications(3, 5);
615
616        assert_eq!(notifications.len(), 1);
617        assert_eq!(notifications[0].frequency, 4);
618    }
619}