use aion_core::{Event, RunId, ScheduleConfig, ScheduleId, WorkflowId};
use chrono::{DateTime, Utc};
use crate::schedule::{ScheduleError, next_fire_time};
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ScheduleExecution {
pub workflow_id: WorkflowId,
pub run_id: RunId,
}
impl ScheduleExecution {
#[must_use]
pub const fn new(workflow_id: WorkflowId, run_id: RunId) -> Self {
Self {
workflow_id,
run_id,
}
}
}
#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct ScheduleState {
pub schedule_id: ScheduleId,
pub config: ScheduleConfig,
pub is_paused: bool,
pub is_deleted: bool,
pub last_triggered_at: Option<DateTime<Utc>>,
pub next_trigger_at: DateTime<Utc>,
pub current_execution: Option<ScheduleExecution>,
}
impl ScheduleState {
pub fn created(
schedule_id: ScheduleId,
config: ScheduleConfig,
recorded_at: DateTime<Utc>,
) -> Result<Self, ScheduleError> {
let next_trigger_at = next_fire_time(&config.trigger, recorded_at)?;
Ok(Self {
schedule_id,
config,
is_paused: false,
is_deleted: false,
last_triggered_at: None,
next_trigger_at,
current_execution: None,
})
}
pub fn apply(&mut self, event: &Event) -> Result<(), ScheduleError> {
match event {
Event::ScheduleCreated {
schedule_id,
config,
envelope,
}
| Event::ScheduleUpdated {
schedule_id,
config,
envelope,
} if schedule_id == &self.schedule_id => {
self.config = config.clone();
self.next_trigger_at = next_fire_time(&self.config.trigger, envelope.recorded_at)?;
}
Event::SchedulePaused { schedule_id, .. } if schedule_id == &self.schedule_id => {
self.is_paused = true;
}
Event::ScheduleResumed {
schedule_id,
envelope,
} if schedule_id == &self.schedule_id => {
self.is_paused = false;
self.next_trigger_at = next_fire_time(&self.config.trigger, envelope.recorded_at)?;
}
Event::ScheduleDeleted { schedule_id, .. } if schedule_id == &self.schedule_id => {
self.is_deleted = true;
}
Event::ScheduleTriggered {
schedule_id,
workflow_id,
run_id,
envelope,
} if schedule_id == &self.schedule_id => {
self.last_triggered_at = Some(envelope.recorded_at);
self.current_execution =
Some(ScheduleExecution::new(workflow_id.clone(), run_id.clone()));
self.next_trigger_at = next_fire_time(&self.config.trigger, envelope.recorded_at)?;
}
_ => {}
}
Ok(())
}
#[must_use]
pub const fn is_active(&self) -> bool {
!self.is_paused && !self.is_deleted
}
pub const fn set_next_trigger_at(&mut self, next_trigger_at: DateTime<Utc>) {
self.next_trigger_at = next_trigger_at;
}
pub fn record_triggered(&mut self, execution: ScheduleExecution, recorded_at: DateTime<Utc>) {
self.last_triggered_at = Some(recorded_at);
self.current_execution = Some(execution);
}
}
pub fn project_schedule_state(events: &[Event]) -> Result<Vec<ScheduleState>, ScheduleError> {
let mut states = Vec::<ScheduleState>::new();
for event in events {
if let Event::ScheduleCreated {
schedule_id,
config,
envelope,
} = event
{
if !states.iter().any(|state| &state.schedule_id == schedule_id) {
states.push(ScheduleState::created(
schedule_id.clone(),
config.clone(),
envelope.recorded_at,
)?);
continue;
}
}
for state in &mut states {
state.apply(event)?;
}
}
Ok(states)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use aion_core::{
CatchUpPolicy, EventEnvelope, OverlapPolicy, Payload, ScheduleConfig, TriggerSpec,
};
use chrono::{DateTime, Utc};
use serde_json::json;
use super::*;
fn parse_utc(value: &str) -> Result<DateTime<Utc>, chrono::ParseError> {
DateTime::parse_from_rfc3339(value).map(|date_time| date_time.with_timezone(&Utc))
}
fn config(label: &str, period_secs: u64) -> Result<ScheduleConfig, aion_core::PayloadError> {
Ok(ScheduleConfig {
trigger: TriggerSpec::Interval {
period: Duration::from_secs(period_secs),
},
overlap_policy: OverlapPolicy::Skip,
catch_up_policy: CatchUpPolicy::One,
workflow_type: String::from("checkout"),
input: Payload::from_json(&json!({ "label": label }))?,
search_attributes: std::collections::HashMap::new(),
})
}
fn envelope(seq: u64, recorded_at: DateTime<Utc>) -> EventEnvelope {
EventEnvelope {
seq,
recorded_at,
workflow_id: WorkflowId::new_v4(),
}
}
#[test]
fn schedule_events_project_state_fields() -> Result<(), Box<dyn std::error::Error>> {
let schedule_id = ScheduleId::new_v4();
let created_at = parse_utc("2026-06-07T00:00:00Z")?;
let updated_at = parse_utc("2026-06-07T00:01:00Z")?;
let triggered_at = parse_utc("2026-06-07T00:02:00Z")?;
let workflow_id = WorkflowId::new_v4();
let run_id = RunId::new_v4();
let created_config = config("created", 60)?;
let updated_config = config("updated", 120)?;
let events = vec![
Event::ScheduleCreated {
envelope: envelope(1, created_at),
schedule_id: schedule_id.clone(),
config: created_config,
},
Event::ScheduleUpdated {
envelope: envelope(2, updated_at),
schedule_id: schedule_id.clone(),
config: updated_config.clone(),
},
Event::SchedulePaused {
envelope: envelope(3, updated_at),
schedule_id: schedule_id.clone(),
},
Event::ScheduleResumed {
envelope: envelope(4, updated_at),
schedule_id: schedule_id.clone(),
},
Event::ScheduleTriggered {
envelope: envelope(5, triggered_at),
schedule_id: schedule_id.clone(),
workflow_id: workflow_id.clone(),
run_id: run_id.clone(),
},
Event::ScheduleDeleted {
envelope: envelope(6, triggered_at),
schedule_id: schedule_id.clone(),
},
];
let projected = project_schedule_state(&events)?;
let state = projected
.iter()
.find(|state| state.schedule_id == schedule_id)
.ok_or("missing projected schedule")?;
assert_eq!(state.config, updated_config);
assert!(!state.is_paused);
assert!(state.is_deleted);
assert_eq!(state.last_triggered_at, Some(triggered_at));
assert_eq!(
state.current_execution,
Some(ScheduleExecution::new(workflow_id, run_id))
);
assert_eq!(
state.next_trigger_at,
triggered_at + chrono::Duration::seconds(120)
);
Ok(())
}
#[test]
fn created_state_is_unpaused_with_initial_next_trigger()
-> Result<(), Box<dyn std::error::Error>> {
let recorded_at = parse_utc("2026-06-07T00:00:00Z")?;
let state =
ScheduleState::created(ScheduleId::new_v4(), config("created", 30)?, recorded_at)?;
assert!(!state.is_paused);
assert!(!state.is_deleted);
assert_eq!(
state.next_trigger_at,
recorded_at + chrono::Duration::seconds(30)
);
Ok(())
}
}