use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::constants::*;
fn default_true() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledEvent {
pub event_id: String,
pub repo: String,
pub trigger_at: String,
pub payload: serde_json::Value,
#[serde(default = "default_true")]
pub one_shot: bool,
pub interval_seconds: Option<u64>,
#[serde(default)]
pub description: String,
#[serde(default)]
pub retry_count: u32,
}
impl ScheduledEvent {
pub fn trigger_datetime(&self) -> Option<DateTime<Utc>> {
DateTime::parse_from_rfc3339(&self.trigger_at)
.ok()
.map(|dt| dt.with_timezone(&Utc))
}
pub fn is_due(&self, now: DateTime<Utc>) -> bool {
self.trigger_datetime().is_some_and(|dt| dt <= now)
}
}
pub struct ScheduledEventManager {
json_path: PathBuf,
events: Vec<ScheduledEvent>,
}
impl ScheduledEventManager {
pub fn new(json_path: impl AsRef<Path>) -> Self {
Self {
json_path: json_path.as_ref().to_path_buf(),
events: Vec::new(),
}
}
pub fn load(&mut self) -> std::io::Result<()> {
if let Some(parent) = self.json_path.parent() {
std::fs::create_dir_all(parent)?;
}
if !self.json_path.exists() {
self.events = Vec::new();
return Ok(());
}
let data = std::fs::read_to_string(&self.json_path)?;
match serde_json::from_str::<Vec<ScheduledEvent>>(&data) {
Ok(events) => {
self.events = events;
Ok(())
}
Err(e) => {
tracing::error!("Failed to parse scheduled events: {}", e);
self.events = Vec::new();
Ok(())
}
}
}
pub fn save(&self) -> std::io::Result<()> {
if let Some(parent) = self.json_path.parent() {
std::fs::create_dir_all(parent)?;
}
let data = serde_json::to_string_pretty(&self.events).map_err(std::io::Error::other)?;
let tmp_path = self.json_path.with_extension("tmp");
std::fs::write(&tmp_path, &data)?;
std::fs::rename(&tmp_path, &self.json_path)?;
Ok(())
}
pub fn create_event(
&mut self,
repo: &str,
trigger_at: DateTime<Utc>,
payload: serde_json::Value,
one_shot: bool,
interval_seconds: Option<u64>,
description: &str,
) -> String {
if !one_shot && interval_seconds.is_none() {
panic!("Recurring events (one_shot=false) require interval_seconds to be set");
}
if let Some(secs) = interval_seconds {
assert!(secs > 0, "interval_seconds must be positive, got {secs}");
}
let event_id = uuid::Uuid::new_v4().to_string()[..12].to_string();
let event = ScheduledEvent {
event_id: event_id.clone(),
repo: repo.to_string(),
trigger_at: trigger_at.to_rfc3339(),
payload,
one_shot,
interval_seconds,
description: description.to_string(),
retry_count: 0,
};
self.events.push(event);
let _ = self.save();
event_id
}
pub fn cancel_event(&mut self, event_id: &str) -> bool {
let before = self.events.len();
self.events.retain(|ev| ev.event_id != event_id);
let removed = self.events.len() < before;
if removed {
let _ = self.save();
}
removed
}
pub fn list_events(&self) -> &[ScheduledEvent] {
&self.events
}
pub fn due_events(&self, now: DateTime<Utc>) -> Vec<&ScheduledEvent> {
self.events.iter().filter(|ev| ev.is_due(now)).collect()
}
pub async fn fire_due_events_with_callback<F, Fut>(&mut self, callback: F) -> usize
where
F: Fn(String, serde_json::Value) -> Fut,
Fut: std::future::Future<Output = Result<(), String>>,
{
let now = chrono::Utc::now();
let mut fired = 0;
let mut remaining = Vec::new();
for event in self.events.drain(..) {
if event.is_due(now) {
match callback(event.repo.clone(), event.payload.clone()).await {
Ok(()) => {
fired += 1;
if !event.one_shot {
if let Some(interval) = event.interval_seconds {
let mut e = event;
if let Some(mut next) = e.trigger_datetime() {
while next <= now {
next += chrono::Duration::seconds(interval as i64);
}
e.trigger_at = next.to_rfc3339();
e.retry_count = 0;
remaining.push(e);
}
}
}
}
Err(_) => {
let mut e = event;
e.retry_count += 1;
if e.retry_count < MAX_SCHEDULED_EVENT_RETRY_COUNT {
remaining.push(e);
}
}
}
} else {
remaining.push(event);
}
}
self.events = remaining;
if fired > 0 {
let _ = self.save();
}
fired
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
use serde_json::json;
use tempfile::TempDir;
fn temp_manager() -> (ScheduledEventManager, TempDir) {
let dir = TempDir::new().unwrap();
let path = dir.path().join("scheduled.json");
let mgr = ScheduledEventManager::new(&path);
(mgr, dir)
}
#[test]
fn is_due_with_past_timestamp() {
let past = Utc::now() - Duration::hours(1);
let event = ScheduledEvent {
event_id: "test1".into(),
repo: "owner/repo".into(),
trigger_at: past.to_rfc3339(),
payload: json!({}),
one_shot: true,
interval_seconds: None,
description: String::new(),
retry_count: 0,
};
assert!(event.is_due(Utc::now()));
}
#[test]
fn is_due_with_future_timestamp() {
let future = Utc::now() + Duration::hours(1);
let event = ScheduledEvent {
event_id: "test2".into(),
repo: "owner/repo".into(),
trigger_at: future.to_rfc3339(),
payload: json!({}),
one_shot: true,
interval_seconds: None,
description: String::new(),
retry_count: 0,
};
assert!(!event.is_due(Utc::now()));
}
#[test]
fn create_event_adds_to_list_and_saves() {
let (mut mgr, _dir) = temp_manager();
let trigger = Utc::now() + Duration::hours(1);
let id = mgr.create_event(
"owner/repo",
trigger,
json!({"key": "val"}),
true,
None,
"test event",
);
assert_eq!(mgr.list_events().len(), 1);
assert_eq!(mgr.list_events()[0].event_id, id);
assert_eq!(mgr.list_events()[0].repo, "owner/repo");
assert_eq!(mgr.list_events()[0].description, "test event");
assert!(mgr.json_path.exists());
}
#[test]
fn cancel_event_removes_from_list() {
let (mut mgr, _dir) = temp_manager();
let trigger = Utc::now() + Duration::hours(1);
let id = mgr.create_event("owner/repo", trigger, json!({}), true, None, "");
assert_eq!(mgr.list_events().len(), 1);
assert!(mgr.cancel_event(&id));
assert_eq!(mgr.list_events().len(), 0);
}
#[test]
fn cancel_event_returns_false_for_nonexistent_id() {
let (mut mgr, _dir) = temp_manager();
assert!(!mgr.cancel_event("nonexistent_id"));
}
#[test]
fn list_events_returns_all_events() {
let (mut mgr, _dir) = temp_manager();
let trigger = Utc::now() + Duration::hours(1);
mgr.create_event("repo1", trigger, json!({}), true, None, "first");
mgr.create_event("repo2", trigger, json!({}), true, None, "second");
mgr.create_event("repo3", trigger, json!({}), true, None, "third");
assert_eq!(mgr.list_events().len(), 3);
}
#[test]
fn due_events_returns_only_due_events() {
let (mut mgr, _dir) = temp_manager();
let now = Utc::now();
let past = now - Duration::hours(1);
let future = now + Duration::hours(1);
mgr.create_event("past-repo", past, json!({}), true, None, "past");
mgr.create_event("future-repo", future, json!({}), true, None, "future");
let due = mgr.due_events(now);
assert_eq!(due.len(), 1);
assert_eq!(due[0].repo, "past-repo");
}
#[test]
fn load_and_save_roundtrip() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("scheduled.json");
{
let mut mgr = ScheduledEventManager::new(&path);
let trigger = Utc::now() + Duration::hours(1);
mgr.create_event(
"owner/repo",
trigger,
json!({"action": "test"}),
true,
None,
"roundtrip",
);
}
{
let mut mgr = ScheduledEventManager::new(&path);
mgr.load().unwrap();
assert_eq!(mgr.list_events().len(), 1);
assert_eq!(mgr.list_events()[0].repo, "owner/repo");
assert_eq!(mgr.list_events()[0].description, "roundtrip");
}
}
#[test]
fn load_from_nonexistent_file_starts_empty() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("does_not_exist.json");
let mut mgr = ScheduledEventManager::new(&path);
mgr.load().unwrap();
assert!(mgr.list_events().is_empty());
}
#[test]
#[should_panic(expected = "Recurring events (one_shot=false) require interval_seconds")]
fn recurring_event_requires_interval_seconds() {
let (mut mgr, _dir) = temp_manager();
let trigger = Utc::now() + Duration::hours(1);
mgr.create_event(
"owner/repo",
trigger,
json!({}),
false,
None,
"bad recurring",
);
}
}