ralph_core/event_loop/
loop_state.rs1use ralph_proto::{Event, HatId};
8use std::collections::{HashMap, HashSet};
9use std::hash::{DefaultHasher, Hash, Hasher};
10use std::time::{Duration, Instant};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct EventSignature {
15 pub topic: String,
16 pub source: Option<HatId>,
17 pub payload_fingerprint: u64,
18}
19
20#[derive(Debug)]
22pub struct LoopState {
23 pub iteration: u32,
25 pub consecutive_failures: u32,
27 pub cumulative_cost: f64,
29 pub started_at: Instant,
31 pub last_hat: Option<HatId>,
33 pub consecutive_blocked: u32,
35 pub last_blocked_hat: Option<HatId>,
37 pub task_block_counts: HashMap<String, u32>,
39 pub abandoned_tasks: Vec<String>,
41 pub abandoned_task_redispatches: u32,
43 pub consecutive_malformed_events: u32,
45 pub completion_requested: bool,
47
48 pub hat_activation_counts: HashMap<HatId, u32>,
50
51 pub exhausted_hats: HashSet<HatId>,
53
54 pub last_checkin_at: Option<Instant>,
57
58 pub last_active_hat_ids: Vec<HatId>,
61
62 pub seen_topics: HashSet<String>,
64
65 pub last_emitted_signature: Option<EventSignature>,
67
68 pub consecutive_same_signature: u32,
70
71 pub cancellation_requested: bool,
73}
74
75impl Default for LoopState {
76 fn default() -> Self {
77 Self {
78 iteration: 0,
79 consecutive_failures: 0,
80 cumulative_cost: 0.0,
81 started_at: Instant::now(),
82 last_hat: None,
83 consecutive_blocked: 0,
84 last_blocked_hat: None,
85 task_block_counts: HashMap::new(),
86 abandoned_tasks: Vec::new(),
87 abandoned_task_redispatches: 0,
88 consecutive_malformed_events: 0,
89 completion_requested: false,
90 hat_activation_counts: HashMap::new(),
91 exhausted_hats: HashSet::new(),
92 last_checkin_at: None,
93 last_active_hat_ids: Vec::new(),
94 seen_topics: HashSet::new(),
95 last_emitted_signature: None,
96 consecutive_same_signature: 0,
97 cancellation_requested: false,
98 }
99 }
100}
101
102impl LoopState {
103 pub fn new() -> Self {
105 Self::default()
106 }
107
108 pub fn elapsed(&self) -> Duration {
110 self.started_at.elapsed()
111 }
112
113 fn event_counts_toward_stale_loop(event: &Event) -> bool {
114 !matches!(event.topic.as_str(), "task.complete")
115 }
116
117 pub fn record_event(&mut self, event: &Event) {
121 self.seen_topics.insert(event.topic.to_string());
122
123 if !Self::event_counts_toward_stale_loop(event) {
124 self.consecutive_same_signature = 0;
125 self.last_emitted_signature = Some(EventSignature::from_event(event));
126 return;
127 }
128
129 let signature = EventSignature::from_event(event);
130 if self.last_emitted_signature.as_ref() == Some(&signature) {
131 self.consecutive_same_signature += 1;
132 } else {
133 self.consecutive_same_signature = 1;
134 self.last_emitted_signature = Some(signature);
135 }
136 }
137
138 pub fn missing_required_events<'a>(&self, required: &'a [String]) -> Vec<&'a String> {
140 required
141 .iter()
142 .filter(|topic| !self.seen_topics.contains(topic.as_str()))
143 .collect()
144 }
145}
146
147impl EventSignature {
148 pub fn from_event(event: &Event) -> Self {
149 Self {
150 topic: event.topic.to_string(),
151 source: event.source.clone(),
152 payload_fingerprint: fingerprint_payload(&event.payload),
153 }
154 }
155}
156
157fn fingerprint_payload(payload: &str) -> u64 {
158 let mut hasher = DefaultHasher::new();
159 payload.hash(&mut hasher);
160 hasher.finish()
161}
162
163#[cfg(test)]
164mod tests {
165 use super::LoopState;
166 use ralph_proto::Event;
167
168 #[test]
169 fn repeated_task_complete_does_not_accumulate_stale_loop_count() {
170 let mut state = LoopState::new();
171
172 state.record_event(&Event::new("task.complete", "task 1 complete"));
173 assert_eq!(state.consecutive_same_signature, 0);
174
175 state.record_event(&Event::new("task.complete", "task 2 complete"));
176 state.record_event(&Event::new("task.complete", "task 3 complete"));
177
178 assert_eq!(state.consecutive_same_signature, 0);
179 assert_eq!(
180 state
181 .last_emitted_signature
182 .as_ref()
183 .map(|s| s.topic.as_str()),
184 Some("task.complete")
185 );
186 }
187
188 #[test]
189 fn repeated_non_progress_topics_still_accumulate_stale_loop_count() {
190 let mut state = LoopState::new();
191
192 state.record_event(&Event::new("task.resume", "same payload"));
193 state.record_event(&Event::new("task.resume", "same payload"));
194 state.record_event(&Event::new("task.resume", "same payload"));
195
196 assert_eq!(state.consecutive_same_signature, 3);
197 assert_eq!(
198 state
199 .last_emitted_signature
200 .as_ref()
201 .map(|s| s.topic.as_str()),
202 Some("task.resume")
203 );
204 }
205}