Skip to main content

ralph_core/
wave_detection.rs

1//! Wave event detection from JSONL events.
2//!
3//! Groups events by wave_id, validates consistency, and resolves
4//! the target hat for wave execution.
5
6use crate::config::HatConfig;
7use crate::event_reader::Event;
8use crate::hat_registry::HatRegistry;
9use ralph_proto::HatId;
10use std::collections::HashMap;
11
12/// A detected wave ready for execution.
13#[derive(Debug)]
14pub struct DetectedWave {
15    /// Wave correlation ID.
16    pub wave_id: String,
17    /// Hat that should process these events.
18    pub target_hat: HatId,
19    /// Configuration for the target hat.
20    pub hat_config: HatConfig,
21    /// Individual events in this wave, ordered by wave_index.
22    pub events: Vec<Event>,
23    /// Total expected events in the wave.
24    pub total: u32,
25}
26
27impl DetectedWave {
28    /// Returns the effective timeout in seconds for wave workers.
29    ///
30    /// Priority: hat.timeout > hat.aggregate.timeout > 300s default.
31    pub fn timeout_secs(&self) -> u64 {
32        self.hat_config
33            .timeout
34            .map(u64::from)
35            .or_else(|| {
36                self.hat_config
37                    .aggregate
38                    .as_ref()
39                    .map(|a| u64::from(a.timeout))
40            })
41            .unwrap_or(300)
42    }
43}
44
45/// Detect wave events from a set of events.
46///
47/// Groups events by wave_id, validates that all events in a wave are consistent
48/// (same topic, matching wave_total), and resolves the target hat from the registry.
49///
50/// v1: Returns the first detected wave (one wave per iteration).
51/// Events without wave metadata are ignored.
52pub fn detect_wave_events(events: &[Event], registry: &HatRegistry) -> Option<DetectedWave> {
53    // Group events by wave_id
54    let mut wave_groups: HashMap<&str, Vec<&Event>> = HashMap::new();
55    for event in events {
56        if let Some(ref wave_id) = event.wave_id {
57            wave_groups.entry(wave_id.as_str()).or_default().push(event);
58        }
59    }
60
61    if wave_groups.is_empty() {
62        return None;
63    }
64
65    // v1: Take the lexicographically first wave_id (deterministic, one wave per iteration)
66    let wave_id = *wave_groups.keys().min()?;
67    if wave_groups.len() > 1 {
68        tracing::warn!(
69            selected = wave_id,
70            total_waves = wave_groups.len(),
71            "Multiple waves detected in single iteration, processing only the first"
72        );
73    }
74    let wave_events = wave_groups.remove(wave_id)?;
75
76    // Validate: all events must have the same topic and wave_total
77    let first = wave_events.first()?;
78    let topic = &first.topic;
79    let wave_total = first.wave_total?;
80
81    for event in &wave_events {
82        if event.topic != *topic {
83            tracing::warn!(
84                wave_id,
85                expected_topic = %topic,
86                actual_topic = %event.topic,
87                "Inconsistent topic in wave events, skipping wave"
88            );
89            return None;
90        }
91        if event.wave_total != Some(wave_total) {
92            tracing::warn!(
93                wave_id,
94                "Inconsistent wave_total in wave events, skipping wave"
95            );
96            return None;
97        }
98    }
99
100    // Resolve target hat from the event topic
101    let target_hat_id = registry.find_by_trigger(topic)?;
102    let hat_config = registry.get_config(target_hat_id)?.clone();
103
104    // Only trigger wave execution for hats with concurrency > 1
105    if hat_config.concurrency <= 1 {
106        return None;
107    }
108
109    // Sort events by wave_index for deterministic ordering
110    let mut sorted_events: Vec<Event> = wave_events.into_iter().cloned().collect();
111    sorted_events.sort_by_key(|e| e.wave_index.unwrap_or(0));
112
113    Some(DetectedWave {
114        wave_id: wave_id.to_string(),
115        target_hat: target_hat_id.clone(),
116        hat_config,
117        events: sorted_events,
118        total: wave_total,
119    })
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use crate::config::RalphConfig;
126
127    fn make_wave_event(topic: &str, payload: &str, wave_id: &str, index: u32, total: u32) -> Event {
128        Event {
129            topic: topic.to_string(),
130            payload: Some(payload.to_string()),
131            ts: "2025-01-01T00:00:00Z".to_string(),
132            wave_id: Some(wave_id.to_string()),
133            wave_index: Some(index),
134            wave_total: Some(total),
135        }
136    }
137
138    fn make_registry_with_concurrent_hat() -> HatRegistry {
139        let yaml = r#"
140            hats:
141              reviewer:
142                name: "Reviewer"
143                triggers: ["review.file"]
144                publishes: ["review.done"]
145                instructions: "Review files"
146                concurrency: 4
147        "#;
148        let config: RalphConfig = serde_yaml::from_str(yaml).unwrap();
149        HatRegistry::from_config(&config)
150    }
151
152    fn make_registry_with_sequential_hat() -> HatRegistry {
153        let yaml = r#"
154            hats:
155              builder:
156                name: "Builder"
157                triggers: ["build.start"]
158                publishes: ["build.done"]
159                instructions: "Build code"
160        "#;
161        let config: RalphConfig = serde_yaml::from_str(yaml).unwrap();
162        HatRegistry::from_config(&config)
163    }
164
165    #[test]
166    fn test_detect_wave_events_basic() {
167        let registry = make_registry_with_concurrent_hat();
168        let events = vec![
169            make_wave_event("review.file", "src/main.rs", "w-abc", 0, 3),
170            make_wave_event("review.file", "src/lib.rs", "w-abc", 1, 3),
171            make_wave_event("review.file", "src/config.rs", "w-abc", 2, 3),
172        ];
173
174        let wave = detect_wave_events(&events, &registry).unwrap();
175        assert_eq!(wave.wave_id, "w-abc");
176        assert_eq!(wave.total, 3);
177        assert_eq!(wave.events.len(), 3);
178        assert_eq!(wave.target_hat.as_str(), "reviewer");
179        assert_eq!(wave.hat_config.concurrency, 4);
180    }
181
182    #[test]
183    fn test_detect_ignores_non_wave_events() {
184        let registry = make_registry_with_concurrent_hat();
185        let events = vec![Event {
186            topic: "review.file".to_string(),
187            payload: Some("src/main.rs".to_string()),
188            ts: "2025-01-01T00:00:00Z".to_string(),
189            wave_id: None,
190            wave_index: None,
191            wave_total: None,
192        }];
193
194        assert!(detect_wave_events(&events, &registry).is_none());
195    }
196
197    #[test]
198    fn test_detect_ignores_sequential_hat() {
199        let registry = make_registry_with_sequential_hat();
200        let events = vec![
201            make_wave_event("build.start", "payload", "w-abc", 0, 2),
202            make_wave_event("build.start", "payload", "w-abc", 1, 2),
203        ];
204
205        // Hat has concurrency=1 (default), so wave detection returns None
206        assert!(detect_wave_events(&events, &registry).is_none());
207    }
208
209    #[test]
210    fn test_detect_rejects_inconsistent_topics() {
211        let registry = make_registry_with_concurrent_hat();
212        let events = vec![
213            make_wave_event("review.file", "src/main.rs", "w-abc", 0, 2),
214            make_wave_event("review.other", "src/lib.rs", "w-abc", 1, 2),
215        ];
216
217        assert!(detect_wave_events(&events, &registry).is_none());
218    }
219
220    #[test]
221    fn test_detect_sorts_by_index() {
222        let registry = make_registry_with_concurrent_hat();
223        // Events arrive out of order
224        let events = vec![
225            make_wave_event("review.file", "third", "w-abc", 2, 3),
226            make_wave_event("review.file", "first", "w-abc", 0, 3),
227            make_wave_event("review.file", "second", "w-abc", 1, 3),
228        ];
229
230        let wave = detect_wave_events(&events, &registry).unwrap();
231        assert_eq!(wave.events[0].payload.as_deref(), Some("first"));
232        assert_eq!(wave.events[1].payload.as_deref(), Some("second"));
233        assert_eq!(wave.events[2].payload.as_deref(), Some("third"));
234    }
235
236    #[test]
237    fn test_empty_events_returns_none() {
238        let registry = make_registry_with_concurrent_hat();
239        assert!(detect_wave_events(&[], &registry).is_none());
240    }
241
242    #[test]
243    fn test_unknown_topic_returns_none() {
244        let registry = make_registry_with_concurrent_hat();
245        let events = vec![make_wave_event("unknown.topic", "payload", "w-abc", 0, 1)];
246
247        assert!(detect_wave_events(&events, &registry).is_none());
248    }
249}