use ralph_proto::{Event, HatId};
use std::collections::{HashMap, HashSet};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EventSignature {
pub topic: String,
pub source: Option<HatId>,
pub payload_fingerprint: u64,
}
#[derive(Debug)]
pub struct LoopState {
pub iteration: u32,
pub consecutive_failures: u32,
pub cumulative_cost: f64,
pub started_at: Instant,
pub last_hat: Option<HatId>,
pub consecutive_blocked: u32,
pub last_blocked_hat: Option<HatId>,
pub task_block_counts: HashMap<String, u32>,
pub abandoned_tasks: Vec<String>,
pub abandoned_task_redispatches: u32,
pub consecutive_malformed_events: u32,
pub completion_requested: bool,
pub hat_activation_counts: HashMap<HatId, u32>,
pub exhausted_hats: HashSet<HatId>,
pub last_checkin_at: Option<Instant>,
pub last_active_hat_ids: Vec<HatId>,
pub seen_topics: HashSet<String>,
pub last_emitted_signature: Option<EventSignature>,
pub consecutive_same_signature: u32,
pub cancellation_requested: bool,
}
impl Default for LoopState {
fn default() -> Self {
Self {
iteration: 0,
consecutive_failures: 0,
cumulative_cost: 0.0,
started_at: Instant::now(),
last_hat: None,
consecutive_blocked: 0,
last_blocked_hat: None,
task_block_counts: HashMap::new(),
abandoned_tasks: Vec::new(),
abandoned_task_redispatches: 0,
consecutive_malformed_events: 0,
completion_requested: false,
hat_activation_counts: HashMap::new(),
exhausted_hats: HashSet::new(),
last_checkin_at: None,
last_active_hat_ids: Vec::new(),
seen_topics: HashSet::new(),
last_emitted_signature: None,
consecutive_same_signature: 0,
cancellation_requested: false,
}
}
}
impl LoopState {
pub fn new() -> Self {
Self::default()
}
pub fn elapsed(&self) -> Duration {
self.started_at.elapsed()
}
fn event_counts_toward_stale_loop(event: &Event) -> bool {
!matches!(event.topic.as_str(), "task.complete")
}
pub fn record_event(&mut self, event: &Event) {
self.seen_topics.insert(event.topic.to_string());
if !Self::event_counts_toward_stale_loop(event) {
self.consecutive_same_signature = 0;
self.last_emitted_signature = Some(EventSignature::from_event(event));
return;
}
let signature = EventSignature::from_event(event);
if self.last_emitted_signature.as_ref() == Some(&signature) {
self.consecutive_same_signature += 1;
} else {
self.consecutive_same_signature = 1;
self.last_emitted_signature = Some(signature);
}
}
pub fn missing_required_events<'a>(&self, required: &'a [String]) -> Vec<&'a String> {
required
.iter()
.filter(|topic| !self.seen_topics.contains(topic.as_str()))
.collect()
}
}
impl EventSignature {
pub fn from_event(event: &Event) -> Self {
Self {
topic: event.topic.to_string(),
source: event.source.clone(),
payload_fingerprint: fingerprint_payload(&event.payload),
}
}
}
fn fingerprint_payload(payload: &str) -> u64 {
let mut hasher = DefaultHasher::new();
payload.hash(&mut hasher);
hasher.finish()
}
#[cfg(test)]
mod tests {
use super::LoopState;
use ralph_proto::Event;
#[test]
fn repeated_task_complete_does_not_accumulate_stale_loop_count() {
let mut state = LoopState::new();
state.record_event(&Event::new("task.complete", "task 1 complete"));
assert_eq!(state.consecutive_same_signature, 0);
state.record_event(&Event::new("task.complete", "task 2 complete"));
state.record_event(&Event::new("task.complete", "task 3 complete"));
assert_eq!(state.consecutive_same_signature, 0);
assert_eq!(
state
.last_emitted_signature
.as_ref()
.map(|s| s.topic.as_str()),
Some("task.complete")
);
}
#[test]
fn repeated_non_progress_topics_still_accumulate_stale_loop_count() {
let mut state = LoopState::new();
state.record_event(&Event::new("task.resume", "same payload"));
state.record_event(&Event::new("task.resume", "same payload"));
state.record_event(&Event::new("task.resume", "same payload"));
assert_eq!(state.consecutive_same_signature, 3);
assert_eq!(
state
.last_emitted_signature
.as_ref()
.map(|s| s.topic.as_str()),
Some("task.resume")
);
}
}