1use crate::config::HatConfig;
7use crate::event_reader::Event;
8use crate::hat_registry::HatRegistry;
9use ralph_proto::HatId;
10use std::collections::HashMap;
11
12#[derive(Debug)]
14pub struct DetectedWave {
15 pub wave_id: String,
17 pub target_hat: HatId,
19 pub hat_config: HatConfig,
21 pub events: Vec<Event>,
23 pub total: u32,
25}
26
27impl DetectedWave {
28 pub fn timeout_secs(&self) -> u64 {
32 self.hat_config
33 .timeout
34 .map(u64::from)
35 .or_else(|| {
36 self.hat_config
37 .aggregate
38 .as_ref()
39 .map(|a| u64::from(a.timeout))
40 })
41 .unwrap_or(300)
42 }
43}
44
45pub fn detect_wave_events(events: &[Event], registry: &HatRegistry) -> Option<DetectedWave> {
53 let mut wave_groups: HashMap<&str, Vec<&Event>> = HashMap::new();
55 for event in events {
56 if let Some(ref wave_id) = event.wave_id {
57 wave_groups.entry(wave_id.as_str()).or_default().push(event);
58 }
59 }
60
61 if wave_groups.is_empty() {
62 return None;
63 }
64
65 let wave_id = *wave_groups.keys().min()?;
67 if wave_groups.len() > 1 {
68 tracing::warn!(
69 selected = wave_id,
70 total_waves = wave_groups.len(),
71 "Multiple waves detected in single iteration, processing only the first"
72 );
73 }
74 let wave_events = wave_groups.remove(wave_id)?;
75
76 let first = wave_events.first()?;
78 let topic = &first.topic;
79 let wave_total = first.wave_total?;
80
81 for event in &wave_events {
82 if event.topic != *topic {
83 tracing::warn!(
84 wave_id,
85 expected_topic = %topic,
86 actual_topic = %event.topic,
87 "Inconsistent topic in wave events, skipping wave"
88 );
89 return None;
90 }
91 if event.wave_total != Some(wave_total) {
92 tracing::warn!(
93 wave_id,
94 "Inconsistent wave_total in wave events, skipping wave"
95 );
96 return None;
97 }
98 }
99
100 let target_hat_id = registry.find_by_trigger(topic)?;
102 let hat_config = registry.get_config(target_hat_id)?.clone();
103
104 if hat_config.concurrency <= 1 {
106 return None;
107 }
108
109 let mut sorted_events: Vec<Event> = wave_events.into_iter().cloned().collect();
111 sorted_events.sort_by_key(|e| e.wave_index.unwrap_or(0));
112
113 Some(DetectedWave {
114 wave_id: wave_id.to_string(),
115 target_hat: target_hat_id.clone(),
116 hat_config,
117 events: sorted_events,
118 total: wave_total,
119 })
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use crate::config::RalphConfig;
126
127 fn make_wave_event(topic: &str, payload: &str, wave_id: &str, index: u32, total: u32) -> Event {
128 Event {
129 topic: topic.to_string(),
130 payload: Some(payload.to_string()),
131 ts: "2025-01-01T00:00:00Z".to_string(),
132 wave_id: Some(wave_id.to_string()),
133 wave_index: Some(index),
134 wave_total: Some(total),
135 }
136 }
137
138 fn make_registry_with_concurrent_hat() -> HatRegistry {
139 let yaml = r#"
140 hats:
141 reviewer:
142 name: "Reviewer"
143 triggers: ["review.file"]
144 publishes: ["review.done"]
145 instructions: "Review files"
146 concurrency: 4
147 "#;
148 let config: RalphConfig = serde_yaml::from_str(yaml).unwrap();
149 HatRegistry::from_config(&config)
150 }
151
152 fn make_registry_with_sequential_hat() -> HatRegistry {
153 let yaml = r#"
154 hats:
155 builder:
156 name: "Builder"
157 triggers: ["build.start"]
158 publishes: ["build.done"]
159 instructions: "Build code"
160 "#;
161 let config: RalphConfig = serde_yaml::from_str(yaml).unwrap();
162 HatRegistry::from_config(&config)
163 }
164
165 #[test]
166 fn test_detect_wave_events_basic() {
167 let registry = make_registry_with_concurrent_hat();
168 let events = vec![
169 make_wave_event("review.file", "src/main.rs", "w-abc", 0, 3),
170 make_wave_event("review.file", "src/lib.rs", "w-abc", 1, 3),
171 make_wave_event("review.file", "src/config.rs", "w-abc", 2, 3),
172 ];
173
174 let wave = detect_wave_events(&events, ®istry).unwrap();
175 assert_eq!(wave.wave_id, "w-abc");
176 assert_eq!(wave.total, 3);
177 assert_eq!(wave.events.len(), 3);
178 assert_eq!(wave.target_hat.as_str(), "reviewer");
179 assert_eq!(wave.hat_config.concurrency, 4);
180 }
181
182 #[test]
183 fn test_detect_ignores_non_wave_events() {
184 let registry = make_registry_with_concurrent_hat();
185 let events = vec![Event {
186 topic: "review.file".to_string(),
187 payload: Some("src/main.rs".to_string()),
188 ts: "2025-01-01T00:00:00Z".to_string(),
189 wave_id: None,
190 wave_index: None,
191 wave_total: None,
192 }];
193
194 assert!(detect_wave_events(&events, ®istry).is_none());
195 }
196
197 #[test]
198 fn test_detect_ignores_sequential_hat() {
199 let registry = make_registry_with_sequential_hat();
200 let events = vec![
201 make_wave_event("build.start", "payload", "w-abc", 0, 2),
202 make_wave_event("build.start", "payload", "w-abc", 1, 2),
203 ];
204
205 assert!(detect_wave_events(&events, ®istry).is_none());
207 }
208
209 #[test]
210 fn test_detect_rejects_inconsistent_topics() {
211 let registry = make_registry_with_concurrent_hat();
212 let events = vec![
213 make_wave_event("review.file", "src/main.rs", "w-abc", 0, 2),
214 make_wave_event("review.other", "src/lib.rs", "w-abc", 1, 2),
215 ];
216
217 assert!(detect_wave_events(&events, ®istry).is_none());
218 }
219
220 #[test]
221 fn test_detect_sorts_by_index() {
222 let registry = make_registry_with_concurrent_hat();
223 let events = vec![
225 make_wave_event("review.file", "third", "w-abc", 2, 3),
226 make_wave_event("review.file", "first", "w-abc", 0, 3),
227 make_wave_event("review.file", "second", "w-abc", 1, 3),
228 ];
229
230 let wave = detect_wave_events(&events, ®istry).unwrap();
231 assert_eq!(wave.events[0].payload.as_deref(), Some("first"));
232 assert_eq!(wave.events[1].payload.as_deref(), Some("second"));
233 assert_eq!(wave.events[2].payload.as_deref(), Some("third"));
234 }
235
236 #[test]
237 fn test_empty_events_returns_none() {
238 let registry = make_registry_with_concurrent_hat();
239 assert!(detect_wave_events(&[], ®istry).is_none());
240 }
241
242 #[test]
243 fn test_unknown_topic_returns_none() {
244 let registry = make_registry_with_concurrent_hat();
245 let events = vec![make_wave_event("unknown.topic", "payload", "w-abc", 0, 1)];
246
247 assert!(detect_wave_events(&events, ®istry).is_none());
248 }
249}