use std::time::Duration;
use aion_core::{TimerId, WorkflowId};
use chrono::{DateTime, Utc};
use crate::time::{TimerService, TimerServiceError};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SleepTimer {
pub timer_id: TimerId,
pub fire_at: DateTime<Utc>,
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum SleepTimerError {
#[error("sleep duration cannot be represented as a chrono duration")]
DurationOutOfRange,
#[error("sleep fire_at timestamp overflowed recorded workflow time")]
FireAtOutOfRange,
#[error("sleep timer scheduling failed: {0}")]
Timer(#[from] TimerServiceError),
}
pub async fn start_timer(
service: &TimerService,
workflow_id: WorkflowId,
timer_id: TimerId,
fire_at: DateTime<Utc>,
) -> Result<(), TimerServiceError> {
service.schedule(workflow_id, timer_id, fire_at).await
}
pub async fn cancel_timer(
service: &TimerService,
workflow_id: WorkflowId,
timer_id: TimerId,
) -> Result<(), TimerServiceError> {
service.cancel(workflow_id, timer_id).await
}
pub async fn sleep(
service: &TimerService,
workflow_id: WorkflowId,
duration: Duration,
recorded_now: DateTime<Utc>,
sequence_position: u64,
) -> Result<SleepTimer, SleepTimerError> {
let chrono_duration =
chrono::Duration::from_std(duration).map_err(|_| SleepTimerError::DurationOutOfRange)?;
let fire_at = recorded_now
.checked_add_signed(chrono_duration)
.ok_or(SleepTimerError::FireAtOutOfRange)?;
let timer_id = TimerId::anonymous(sequence_position);
service
.schedule(workflow_id, timer_id.clone(), fire_at)
.await?;
Ok(SleepTimer { timer_id, fire_at })
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use aion_core::{Event, EventEnvelope, IdError, TimerId, WorkflowId};
use aion_store::{InMemoryStore, ReadableEventStore, StoreError, WritableEventStore};
use chrono::{DateTime, Utc};
use super::{SleepTimerError, cancel_timer, sleep, start_timer};
use crate::engine_seam::test_support::{FakeEngineHandle, FakeEngineOperation};
use crate::engine_seam::{
EngineHandle, TimerWheelEntry, WorkflowProcessHandle, WorkflowResidency,
};
use crate::time::{TimerService, TimerServiceError};
#[derive(thiserror::Error, Debug)]
enum TestError {
#[error(transparent)]
Timer(#[from] TimerServiceError),
#[error(transparent)]
Sleep(#[from] SleepTimerError),
#[error(transparent)]
Store(#[from] StoreError),
#[error(transparent)]
Engine(#[from] crate::engine_seam::EngineSeamError),
#[error(transparent)]
Id(#[from] IdError),
}
fn instant(offset_seconds: i64) -> DateTime<Utc> {
DateTime::from_timestamp(1_700_000_000 + offset_seconds, 0).unwrap_or_default()
}
fn recorded_at() -> DateTime<Utc> {
instant(1)
}
fn workflow_id() -> WorkflowId {
WorkflowId::new_v4()
}
fn service() -> (Arc<InMemoryStore>, Arc<FakeEngineHandle>, TimerService) {
let concrete_store = Arc::new(InMemoryStore::default());
let writable: Arc<dyn WritableEventStore> = concrete_store.clone();
let readable: Arc<dyn ReadableEventStore> = concrete_store.clone();
let engine = Arc::new(FakeEngineHandle::recording_to(writable));
let service = TimerService::with_recorded_at(engine.clone(), readable, recorded_at);
(concrete_store, engine, service)
}
async fn history(
store: &InMemoryStore,
workflow_id: &WorkflowId,
) -> Result<Vec<Event>, StoreError> {
store.read_history(workflow_id).await
}
fn timer_started_event(workflow_id: &WorkflowId, timer_id: &TimerId, seq: u64) -> Event {
Event::TimerStarted {
envelope: EventEnvelope {
seq,
recorded_at: instant(0),
workflow_id: workflow_id.clone(),
},
timer_id: timer_id.clone(),
fire_at: instant(5),
}
}
fn count_cancelled(events: &[Event], timer_id: &TimerId) -> usize {
events
.iter()
.filter(|event| {
matches!(event, Event::TimerCancelled { timer_id: recorded, .. } if recorded == timer_id)
})
.count()
}
fn count_fired(events: &[Event], timer_id: &TimerId) -> usize {
events
.iter()
.filter(|event| {
matches!(event, Event::TimerFired { timer_id: recorded, .. } if recorded == timer_id)
})
.count()
}
#[tokio::test]
async fn start_timer_preserves_named_id_in_history_and_timer_row() -> Result<(), TestError> {
let (store, _engine, service) = service();
let workflow_id = workflow_id();
let timer_id = TimerId::named("deadline")?;
let fire_at = instant(10);
start_timer(&service, workflow_id.clone(), timer_id.clone(), fire_at).await?;
let expired = store.expired_timers(fire_at).await?;
assert_eq!(expired.len(), 1);
assert_eq!(expired[0].workflow_id, workflow_id);
assert_eq!(expired[0].timer_id, timer_id);
assert_eq!(expired[0].fire_at, fire_at);
let history = history(&store, &workflow_id).await?;
assert!(history.is_empty());
Ok(())
}
#[tokio::test]
async fn cancel_timer_disarms_resident_wheel_and_records_cancelled() -> Result<(), TestError> {
let process = WorkflowProcessHandle::new(42);
let (store, engine, service) = service();
let workflow_id = workflow_id();
let timer_id = TimerId::named("deadline")?;
let fire_at = instant(20);
engine.set_residency(workflow_id.clone(), WorkflowResidency::Resident(process))?;
engine.record_workflow_event(
&workflow_id,
timer_started_event(&workflow_id, &timer_id, 1),
)?;
start_timer(&service, workflow_id.clone(), timer_id.clone(), fire_at).await?;
cancel_timer(&service, workflow_id.clone(), timer_id.clone()).await?;
assert!(engine.armed_timers()?.is_empty());
let history = history(&store, &workflow_id).await?;
assert_eq!(count_cancelled(&history, &timer_id), 1);
assert!(matches!(
history.as_slice(),
[
Event::TimerStarted { .. },
Event::TimerCancelled {
envelope,
timer_id: recorded,
}
] if envelope.seq == 2 && recorded == &timer_id
));
assert!(engine.operations()?.iter().any(|operation| matches!(
operation,
FakeEngineOperation::TimerDisarmed { process: disarmed_process, timer_id: disarmed }
if disarmed_process == &process && disarmed == &timer_id
)));
Ok(())
}
#[tokio::test]
async fn cancel_timer_after_fire_is_noop() -> Result<(), TestError> {
let process = WorkflowProcessHandle::new(42);
let (store, engine, service) = service();
let workflow_id = workflow_id();
let timer_id = TimerId::named("deadline")?;
let fire_at = instant(30);
engine.set_residency(workflow_id.clone(), WorkflowResidency::Resident(process))?;
engine.record_workflow_event(
&workflow_id,
timer_started_event(&workflow_id, &timer_id, 1),
)?;
start_timer(&service, workflow_id.clone(), timer_id.clone(), fire_at).await?;
service
.fire_timer(workflow_id.clone(), timer_id.clone(), fire_at)
.await?;
let operation_count = engine.operations()?.len();
cancel_timer(&service, workflow_id.clone(), timer_id.clone()).await?;
let history = history(&store, &workflow_id).await?;
assert_eq!(count_fired(&history, &timer_id), 1);
assert_eq!(count_cancelled(&history, &timer_id), 0);
assert_eq!(engine.operations()?.len(), operation_count);
Ok(())
}
#[tokio::test]
async fn cancel_timer_after_cancel_is_idempotent_noop() -> Result<(), TestError> {
let process = WorkflowProcessHandle::new(42);
let (store, engine, service) = service();
let workflow_id = workflow_id();
let timer_id = TimerId::named("deadline")?;
let fire_at = instant(40);
engine.set_residency(workflow_id.clone(), WorkflowResidency::Resident(process))?;
engine.record_workflow_event(
&workflow_id,
timer_started_event(&workflow_id, &timer_id, 1),
)?;
start_timer(&service, workflow_id.clone(), timer_id.clone(), fire_at).await?;
cancel_timer(&service, workflow_id.clone(), timer_id.clone()).await?;
let operation_count = engine.operations()?.len();
cancel_timer(&service, workflow_id.clone(), timer_id.clone()).await?;
let history = history(&store, &workflow_id).await?;
assert_eq!(count_cancelled(&history, &timer_id), 1);
assert_eq!(engine.operations()?.len(), operation_count);
Ok(())
}
#[tokio::test]
async fn cancel_timer_settles_anonymous_scope_deadline() -> Result<(), TestError> {
let process = WorkflowProcessHandle::new(42);
let (store, engine, service) = service();
let workflow_id = workflow_id();
let timer_id = TimerId::anonymous(42);
let fire_at = instant(40);
engine.set_residency(workflow_id.clone(), WorkflowResidency::Resident(process))?;
engine.record_workflow_event(
&workflow_id,
timer_started_event(&workflow_id, &timer_id, 1),
)?;
service
.schedule(workflow_id.clone(), timer_id.clone(), fire_at)
.await?;
cancel_timer(&service, workflow_id.clone(), timer_id.clone()).await?;
let history = history(&store, &workflow_id).await?;
assert_eq!(count_cancelled(&history, &timer_id), 1);
assert_eq!(count_fired(&history, &timer_id), 0);
Ok(())
}
#[tokio::test]
async fn sleep_derives_anonymous_id_and_fire_at_from_recorded_inputs() -> Result<(), TestError>
{
let (store, _engine, service) = service();
let workflow_id = workflow_id();
let recorded_now = instant(50);
let duration = Duration::from_secs(15);
let sequence_position = 9;
let expected_timer_id = TimerId::anonymous(sequence_position);
let expected_fire_at = instant(65);
let scheduled = sleep(
&service,
workflow_id.clone(),
duration,
recorded_now,
sequence_position,
)
.await?;
assert_eq!(scheduled.timer_id, expected_timer_id);
assert_eq!(scheduled.fire_at, expected_fire_at);
let history = history(&store, &workflow_id).await?;
assert!(history.is_empty());
Ok(())
}
#[tokio::test]
async fn start_timer_arms_named_timer_without_rewriting_id() -> Result<(), TestError> {
let process = WorkflowProcessHandle::new(42);
let (_store, engine, service) = service();
let workflow_id = workflow_id();
let timer_id = TimerId::named("deadline")?;
let fire_at = instant(70);
engine.set_residency(workflow_id.clone(), WorkflowResidency::Resident(process))?;
start_timer(&service, workflow_id, timer_id.clone(), fire_at).await?;
assert_eq!(
engine.armed_timers()?,
vec![TimerWheelEntry {
process,
timer_id,
fire_at,
}]
);
Ok(())
}
}