use std::time::Duration;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum WatchAction {
Alert(EscalationRouting),
Suspend,
Page(EscalationRouting),
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct EscalationRouting {
pub ntfy_topic: Option<String>,
pub slack_channel: Option<String>,
pub github_issue_template: Option<String>,
pub routing_key: Option<String>,
}
impl EscalationRouting {
pub fn is_empty(&self) -> bool {
self.ntfy_topic.is_none()
&& self.slack_channel.is_none()
&& self.github_issue_template.is_none()
&& self.routing_key.is_none()
}
}
#[derive(Debug, Clone)]
pub struct WatchRule<S> {
pub state: S,
pub threshold: Duration,
pub action: WatchAction,
}
#[derive(Debug, Clone, Default)]
pub struct TimeoutWatcher<S> {
rules: Vec<WatchRule<S>>,
}
impl<S> TimeoutWatcher<S> {
pub fn new() -> Self {
Self { rules: Vec::new() }
}
#[must_use]
pub fn with_rule(mut self, state: S, threshold: Duration, action: WatchAction) -> Self {
self.rules.push(WatchRule {
state,
threshold,
action,
});
self
}
pub fn rule_count(&self) -> usize {
self.rules.len()
}
pub fn is_empty(&self) -> bool {
self.rules.is_empty()
}
}
impl<S: PartialEq> TimeoutWatcher<S> {
pub fn evaluate(
&self,
state: &S,
entered_at: DateTime<Utc>,
now: DateTime<Utc>,
) -> Option<&WatchAction> {
let elapsed = elapsed_nonneg(entered_at, now);
for rule in &self.rules {
if rule.state == *state && elapsed >= rule.threshold {
return Some(&rule.action);
}
}
None
}
}
fn elapsed_nonneg(entered_at: DateTime<Utc>, now: DateTime<Utc>) -> Duration {
let delta = now.signed_duration_since(entered_at);
delta.to_std().unwrap_or(Duration::ZERO)
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ScheduleWindow {
pub cron: String,
pub timezone: String,
pub window: Duration,
}
impl ScheduleWindow {
pub fn new(cron: impl Into<String>) -> Self {
Self {
cron: cron.into(),
timezone: "UTC".into(),
window: Duration::from_secs(5 * 60),
}
}
#[must_use]
pub fn with_timezone(mut self, tz: impl Into<String>) -> Self {
self.timezone = tz.into();
self
}
#[must_use]
pub fn with_window(mut self, window: Duration) -> Self {
self.window = window;
self
}
pub fn is_in_window(&self, scheduled_at: DateTime<Utc>, now: DateTime<Utc>) -> bool {
if now < scheduled_at {
return false;
}
let window_end = scheduled_at
+ chrono::Duration::from_std(self.window).unwrap_or_else(|_| chrono::Duration::zero());
now <= window_end
}
pub fn is_window_closed(&self, scheduled_at: DateTime<Utc>, now: DateTime<Utc>) -> bool {
if now < scheduled_at {
return false;
}
let window_end = scheduled_at
+ chrono::Duration::from_std(self.window).unwrap_or_else(|_| chrono::Duration::zero());
now > window_end
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Clone, PartialEq, Eq)]
enum Phase {
Compiling,
Planning,
Applying,
Ready,
}
fn routing_to(topic: &str) -> EscalationRouting {
EscalationRouting {
ntfy_topic: Some(topic.into()),
..Default::default()
}
}
fn t(secs: i64) -> DateTime<Utc> {
DateTime::<Utc>::from_timestamp(secs, 0).unwrap()
}
#[test]
fn empty_watcher_is_noop() {
let w = TimeoutWatcher::<Phase>::new();
assert!(w.is_empty());
assert_eq!(w.evaluate(&Phase::Compiling, t(0), t(99_999)), None);
}
#[test]
fn threshold_not_yet_elapsed_returns_none() {
let w = TimeoutWatcher::<Phase>::new().with_rule(
Phase::Compiling,
Duration::from_secs(300),
WatchAction::Alert(routing_to("compile-stuck")),
);
assert_eq!(w.evaluate(&Phase::Compiling, t(0), t(100)), None);
assert_eq!(w.evaluate(&Phase::Compiling, t(0), t(299)), None);
}
#[test]
fn threshold_exactly_at_returns_action() {
let w = TimeoutWatcher::<Phase>::new().with_rule(
Phase::Compiling,
Duration::from_secs(300),
WatchAction::Alert(routing_to("compile-stuck")),
);
match w.evaluate(&Phase::Compiling, t(0), t(300)) {
Some(WatchAction::Alert(r)) => assert_eq!(r.ntfy_topic.as_deref(), Some("compile-stuck")),
other => panic!("expected Alert, got {other:?}"),
}
}
#[test]
fn threshold_exceeded_returns_action() {
let w = TimeoutWatcher::<Phase>::new().with_rule(
Phase::Compiling,
Duration::from_secs(300),
WatchAction::Alert(routing_to("c")),
);
assert!(w.evaluate(&Phase::Compiling, t(0), t(500)).is_some());
}
#[test]
fn different_state_does_not_match() {
let w = TimeoutWatcher::<Phase>::new().with_rule(
Phase::Compiling,
Duration::from_secs(60),
WatchAction::Alert(routing_to("c")),
);
assert_eq!(w.evaluate(&Phase::Planning, t(0), t(99_999)), None);
assert_eq!(w.evaluate(&Phase::Ready, t(0), t(99_999)), None);
}
#[test]
fn multiple_rules_first_match_wins() {
let w = TimeoutWatcher::<Phase>::new()
.with_rule(
Phase::Compiling,
Duration::from_secs(100),
WatchAction::Alert(routing_to("first")),
)
.with_rule(
Phase::Compiling,
Duration::from_secs(50),
WatchAction::Page(routing_to("second")),
);
match w.evaluate(&Phase::Compiling, t(0), t(200)) {
Some(WatchAction::Alert(r)) => assert_eq!(r.ntfy_topic.as_deref(), Some("first")),
other => panic!("expected first-match Alert, got {other:?}"),
}
}
#[test]
fn per_state_thresholds() {
let w = TimeoutWatcher::<Phase>::new()
.with_rule(
Phase::Compiling,
Duration::from_secs(5 * 60),
WatchAction::Alert(routing_to("compile-stuck")),
)
.with_rule(
Phase::Planning,
Duration::from_secs(10 * 60),
WatchAction::Alert(routing_to("plan-stuck")),
)
.with_rule(
Phase::Applying,
Duration::from_secs(30 * 60),
WatchAction::Page(routing_to("apply-stuck")),
);
match w.evaluate(&Phase::Compiling, t(0), t(6 * 60)) {
Some(WatchAction::Alert(r)) => {
assert_eq!(r.ntfy_topic.as_deref(), Some("compile-stuck"))
}
other => panic!("expected Alert, got {other:?}"),
}
assert_eq!(w.evaluate(&Phase::Planning, t(0), t(6 * 60)), None);
assert_eq!(w.evaluate(&Phase::Applying, t(0), t(6 * 60)), None);
}
#[test]
fn clock_skew_returns_none() {
let w = TimeoutWatcher::<Phase>::new().with_rule(
Phase::Compiling,
Duration::from_secs(1),
WatchAction::Alert(routing_to("c")),
);
assert_eq!(w.evaluate(&Phase::Compiling, t(1000), t(500)), None);
}
#[test]
fn watch_action_serializes_round_trip() {
let a = WatchAction::Alert(routing_to("topic-a"));
let json = serde_json::to_string(&a).unwrap();
let back: WatchAction = serde_json::from_str(&json).unwrap();
assert_eq!(a, back);
}
#[test]
fn escalation_routing_is_empty_when_all_none() {
let r = EscalationRouting::default();
assert!(r.is_empty());
let r2 = routing_to("anything");
assert!(!r2.is_empty());
}
#[test]
fn determinism_law() {
let w = TimeoutWatcher::<Phase>::new().with_rule(
Phase::Compiling,
Duration::from_secs(60),
WatchAction::Alert(routing_to("c")),
);
crate::testing::assert_deterministic_over(
&[0_i64, 30, 59, 60, 61, 1000],
|&elapsed_s| w.evaluate(&Phase::Compiling, t(0), t(elapsed_s)),
);
}
#[test]
fn schedule_window_defaults_utc_and_5min() {
let s = ScheduleWindow::new("0 2 * * *");
assert_eq!(s.cron, "0 2 * * *");
assert_eq!(s.timezone, "UTC");
assert_eq!(s.window, Duration::from_secs(5 * 60));
}
#[test]
fn schedule_window_builder_overrides() {
let s = ScheduleWindow::new("* * * * *")
.with_timezone("America/New_York")
.with_window(Duration::from_secs(120));
assert_eq!(s.timezone, "America/New_York");
assert_eq!(s.window, Duration::from_secs(120));
}
#[test]
fn schedule_window_now_before_scheduled_is_not_in_window() {
let s = ScheduleWindow::new("0 2 * * *").with_window(Duration::from_secs(300));
assert!(!s.is_in_window(t(1000), t(999)));
assert!(!s.is_window_closed(t(1000), t(999)));
}
#[test]
fn schedule_window_now_exactly_at_scheduled_is_in_window() {
let s = ScheduleWindow::new("0 2 * * *").with_window(Duration::from_secs(300));
assert!(s.is_in_window(t(1000), t(1000)));
assert!(!s.is_window_closed(t(1000), t(1000)));
}
#[test]
fn schedule_window_within_window_is_in_window() {
let s = ScheduleWindow::new("0 2 * * *").with_window(Duration::from_secs(300));
assert!(s.is_in_window(t(1000), t(1200)));
assert!(!s.is_window_closed(t(1000), t(1200)));
}
#[test]
fn schedule_window_at_window_boundary_is_in_window() {
let s = ScheduleWindow::new("0 2 * * *").with_window(Duration::from_secs(300));
assert!(s.is_in_window(t(1000), t(1300)));
assert!(!s.is_window_closed(t(1000), t(1300)));
}
#[test]
fn schedule_window_past_window_is_closed() {
let s = ScheduleWindow::new("0 2 * * *").with_window(Duration::from_secs(300));
assert!(!s.is_in_window(t(1000), t(1301)));
assert!(s.is_window_closed(t(1000), t(1301)));
assert!(!s.is_in_window(t(1000), t(99_999)));
assert!(s.is_window_closed(t(1000), t(99_999)));
}
#[test]
fn schedule_window_in_window_and_closed_are_complementary_after_scheduled() {
let s = ScheduleWindow::new("* * * * *").with_window(Duration::from_secs(60));
for offset in [0, 1, 30, 59, 60, 61, 100, 1000] {
let scheduled = t(1000);
let now = t(1000 + offset);
let in_w = s.is_in_window(scheduled, now);
let closed = s.is_window_closed(scheduled, now);
assert!(
in_w != closed,
"exactly one of in_window/closed must be true at offset {offset}; got in={in_w}, closed={closed}"
);
}
}
#[test]
fn schedule_window_serde_round_trip() {
let s = ScheduleWindow::new("0 2 * * *")
.with_timezone("America/New_York")
.with_window(Duration::from_secs(600));
let json = serde_json::to_string(&s).unwrap();
let back: ScheduleWindow = serde_json::from_str(&json).unwrap();
assert_eq!(s, back);
}
#[test]
fn schedule_window_zero_window_only_fires_at_exact_moment() {
let s = ScheduleWindow::new("* * * * *").with_window(Duration::ZERO);
assert!(s.is_in_window(t(1000), t(1000)), "zero window is exactly the scheduled instant");
assert!(!s.is_in_window(t(1000), t(1001)), "one second past is already closed");
}
}