Skip to main content

ralph_core/event_loop/
loop_state.rs

1//! Loop state tracking for the event loop.
2//!
3//! This module contains the `LoopState` struct that tracks the current
4//! state of the orchestration loop including iteration count, failures,
5//! timing, and hat activation tracking.
6
7use ralph_proto::{Event, HatId};
8use std::collections::{HashMap, HashSet};
9use std::hash::{DefaultHasher, Hash, Hasher};
10use std::time::{Duration, Instant};
11
12/// Fingerprint of the last emitted event for stale loop detection.
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct EventSignature {
15    pub topic: String,
16    pub source: Option<HatId>,
17    pub payload_fingerprint: u64,
18}
19
20/// Current state of the event loop.
21#[derive(Debug)]
22pub struct LoopState {
23    /// Current iteration number (1-indexed).
24    pub iteration: u32,
25    /// Number of consecutive failures.
26    pub consecutive_failures: u32,
27    /// Cumulative cost in USD (if tracked).
28    pub cumulative_cost: f64,
29    /// When the loop started.
30    pub started_at: Instant,
31    /// The last hat that executed.
32    pub last_hat: Option<HatId>,
33    /// Consecutive blocked events from the same hat.
34    pub consecutive_blocked: u32,
35    /// Hat that emitted the last blocked event.
36    pub last_blocked_hat: Option<HatId>,
37    /// Per-task block counts for task-level thrashing detection.
38    pub task_block_counts: HashMap<String, u32>,
39    /// Tasks that have been abandoned after 3+ blocks.
40    pub abandoned_tasks: Vec<String>,
41    /// Count of times planner dispatched an already-abandoned task.
42    pub abandoned_task_redispatches: u32,
43    /// Consecutive malformed JSONL lines encountered (for validation backpressure).
44    pub consecutive_malformed_events: u32,
45    /// Whether a completion event has been observed in JSONL.
46    pub completion_requested: bool,
47
48    /// Per-hat activation counts (used for max_activations).
49    pub hat_activation_counts: HashMap<HatId, u32>,
50
51    /// Hats for which `<hat_id>.exhausted` has been emitted.
52    pub exhausted_hats: HashSet<HatId>,
53
54    /// When the last Telegram check-in message was sent.
55    /// `None` means no check-in has been sent yet.
56    pub last_checkin_at: Option<Instant>,
57
58    /// Hat IDs that were active in the last iteration.
59    /// Used to inject `default_publishes` when agent writes no events.
60    pub last_active_hat_ids: Vec<HatId>,
61
62    /// Topics seen during the loop's lifetime (for event chain validation).
63    pub seen_topics: HashSet<String>,
64
65    /// The last event signature emitted (for stale loop detection).
66    pub last_emitted_signature: Option<EventSignature>,
67
68    /// Consecutive times the same event signature was emitted (for stale loop detection).
69    pub consecutive_same_signature: u32,
70
71    /// Set to true when a loop.cancel event is detected.
72    pub cancellation_requested: bool,
73}
74
75impl Default for LoopState {
76    fn default() -> Self {
77        Self {
78            iteration: 0,
79            consecutive_failures: 0,
80            cumulative_cost: 0.0,
81            started_at: Instant::now(),
82            last_hat: None,
83            consecutive_blocked: 0,
84            last_blocked_hat: None,
85            task_block_counts: HashMap::new(),
86            abandoned_tasks: Vec::new(),
87            abandoned_task_redispatches: 0,
88            consecutive_malformed_events: 0,
89            completion_requested: false,
90            hat_activation_counts: HashMap::new(),
91            exhausted_hats: HashSet::new(),
92            last_checkin_at: None,
93            last_active_hat_ids: Vec::new(),
94            seen_topics: HashSet::new(),
95            last_emitted_signature: None,
96            consecutive_same_signature: 0,
97            cancellation_requested: false,
98        }
99    }
100}
101
102impl LoopState {
103    /// Creates a new loop state.
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    /// Returns the elapsed time since the loop started.
109    pub fn elapsed(&self) -> Duration {
110        self.started_at.elapsed()
111    }
112
113    fn event_counts_toward_stale_loop(event: &Event) -> bool {
114        !matches!(event.topic.as_str(), "task.complete")
115    }
116
117    /// Record that an event has been seen during this loop run.
118    ///
119    /// Also tracks consecutive same-signature emissions for stale loop detection.
120    pub fn record_event(&mut self, event: &Event) {
121        self.seen_topics.insert(event.topic.to_string());
122
123        if !Self::event_counts_toward_stale_loop(event) {
124            self.consecutive_same_signature = 0;
125            self.last_emitted_signature = Some(EventSignature::from_event(event));
126            return;
127        }
128
129        let signature = EventSignature::from_event(event);
130        if self.last_emitted_signature.as_ref() == Some(&signature) {
131            self.consecutive_same_signature += 1;
132        } else {
133            self.consecutive_same_signature = 1;
134            self.last_emitted_signature = Some(signature);
135        }
136    }
137
138    /// Check if all required topics have been seen.
139    pub fn missing_required_events<'a>(&self, required: &'a [String]) -> Vec<&'a String> {
140        required
141            .iter()
142            .filter(|topic| !self.seen_topics.contains(topic.as_str()))
143            .collect()
144    }
145}
146
147impl EventSignature {
148    pub fn from_event(event: &Event) -> Self {
149        Self {
150            topic: event.topic.to_string(),
151            source: event.source.clone(),
152            payload_fingerprint: fingerprint_payload(&event.payload),
153        }
154    }
155}
156
157fn fingerprint_payload(payload: &str) -> u64 {
158    let mut hasher = DefaultHasher::new();
159    payload.hash(&mut hasher);
160    hasher.finish()
161}
162
163#[cfg(test)]
164mod tests {
165    use super::LoopState;
166    use ralph_proto::Event;
167
168    #[test]
169    fn repeated_task_complete_does_not_accumulate_stale_loop_count() {
170        let mut state = LoopState::new();
171
172        state.record_event(&Event::new("task.complete", "task 1 complete"));
173        assert_eq!(state.consecutive_same_signature, 0);
174
175        state.record_event(&Event::new("task.complete", "task 2 complete"));
176        state.record_event(&Event::new("task.complete", "task 3 complete"));
177
178        assert_eq!(state.consecutive_same_signature, 0);
179        assert_eq!(
180            state
181                .last_emitted_signature
182                .as_ref()
183                .map(|s| s.topic.as_str()),
184            Some("task.complete")
185        );
186    }
187
188    #[test]
189    fn repeated_non_progress_topics_still_accumulate_stale_loop_count() {
190        let mut state = LoopState::new();
191
192        state.record_event(&Event::new("task.resume", "same payload"));
193        state.record_event(&Event::new("task.resume", "same payload"));
194        state.record_event(&Event::new("task.resume", "same payload"));
195
196        assert_eq!(state.consecutive_same_signature, 3);
197        assert_eq!(
198            state
199                .last_emitted_signature
200                .as_ref()
201                .map(|s| s.topic.as_str()),
202            Some("task.resume")
203        );
204    }
205}