Skip to main content

ralph_core/event_loop/
loop_state.rs

1//! Loop state tracking for the event loop.
2//!
3//! This module contains the `LoopState` struct that tracks the current
4//! state of the orchestration loop including iteration count, failures,
5//! timing, and hat activation tracking.
6
7use ralph_proto::HatId;
8use std::collections::{HashMap, HashSet};
9use std::time::{Duration, Instant};
10
11/// Current state of the event loop.
12#[derive(Debug)]
13pub struct LoopState {
14    /// Current iteration number (1-indexed).
15    pub iteration: u32,
16    /// Number of consecutive failures.
17    pub consecutive_failures: u32,
18    /// Cumulative cost in USD (if tracked).
19    pub cumulative_cost: f64,
20    /// When the loop started.
21    pub started_at: Instant,
22    /// The last hat that executed.
23    pub last_hat: Option<HatId>,
24    /// Consecutive blocked events from the same hat.
25    pub consecutive_blocked: u32,
26    /// Hat that emitted the last blocked event.
27    pub last_blocked_hat: Option<HatId>,
28    /// Per-task block counts for task-level thrashing detection.
29    pub task_block_counts: HashMap<String, u32>,
30    /// Tasks that have been abandoned after 3+ blocks.
31    pub abandoned_tasks: Vec<String>,
32    /// Count of times planner dispatched an already-abandoned task.
33    pub abandoned_task_redispatches: u32,
34    /// Consecutive malformed JSONL lines encountered (for validation backpressure).
35    pub consecutive_malformed_events: u32,
36    /// Whether a completion event has been observed in JSONL.
37    pub completion_requested: bool,
38
39    /// Per-hat activation counts (used for max_activations).
40    pub hat_activation_counts: HashMap<HatId, u32>,
41
42    /// Hats for which `<hat_id>.exhausted` has been emitted.
43    pub exhausted_hats: HashSet<HatId>,
44
45    /// When the last Telegram check-in message was sent.
46    /// `None` means no check-in has been sent yet.
47    pub last_checkin_at: Option<Instant>,
48
49    /// Hat IDs that were active in the last iteration.
50    /// Used to inject `default_publishes` when agent writes no events.
51    pub last_active_hat_ids: Vec<HatId>,
52
53    /// Topics seen during the loop's lifetime (for event chain validation).
54    pub seen_topics: HashSet<String>,
55
56    /// The last topic emitted (for stale loop detection).
57    pub last_emitted_topic: Option<String>,
58
59    /// Consecutive times the same topic was emitted (for stale loop detection).
60    pub consecutive_same_topic: u32,
61
62    /// Set to true when a loop.cancel event is detected.
63    pub cancellation_requested: bool,
64}
65
66impl Default for LoopState {
67    fn default() -> Self {
68        Self {
69            iteration: 0,
70            consecutive_failures: 0,
71            cumulative_cost: 0.0,
72            started_at: Instant::now(),
73            last_hat: None,
74            consecutive_blocked: 0,
75            last_blocked_hat: None,
76            task_block_counts: HashMap::new(),
77            abandoned_tasks: Vec::new(),
78            abandoned_task_redispatches: 0,
79            consecutive_malformed_events: 0,
80            completion_requested: false,
81            hat_activation_counts: HashMap::new(),
82            exhausted_hats: HashSet::new(),
83            last_checkin_at: None,
84            last_active_hat_ids: Vec::new(),
85            seen_topics: HashSet::new(),
86            last_emitted_topic: None,
87            consecutive_same_topic: 0,
88            cancellation_requested: false,
89        }
90    }
91}
92
93impl LoopState {
94    /// Creates a new loop state.
95    pub fn new() -> Self {
96        Self::default()
97    }
98
99    /// Returns the elapsed time since the loop started.
100    pub fn elapsed(&self) -> Duration {
101        self.started_at.elapsed()
102    }
103
104    /// Record that a topic has been seen during this loop run.
105    ///
106    /// Also tracks consecutive same-topic emissions for stale loop detection.
107    pub fn record_topic(&mut self, topic: &str) {
108        self.seen_topics.insert(topic.to_string());
109
110        if self.last_emitted_topic.as_deref() == Some(topic) {
111            self.consecutive_same_topic += 1;
112        } else {
113            self.consecutive_same_topic = 1;
114            self.last_emitted_topic = Some(topic.to_string());
115        }
116    }
117
118    /// Check if all required topics have been seen.
119    pub fn missing_required_events<'a>(&self, required: &'a [String]) -> Vec<&'a String> {
120        required
121            .iter()
122            .filter(|topic| !self.seen_topics.contains(topic.as_str()))
123            .collect()
124    }
125}