use std::sync::Arc;
use tokio::sync::watch;
use tokio::time::interval;
use tracing::{debug, error, info, warn};
use super::RuntimeConfig;
use super::registry::WorkflowRuntime;
use crate::store::{OutboxStore, Store, WorkflowQueryStore};
pub(crate) struct TimerWorker<S, O>
where
S: Store + WorkflowQueryStore,
O: OutboxStore,
{
runtime: Arc<WorkflowRuntime<S>>,
outbox: O,
config: RuntimeConfig,
worker_id: String,
registered_types: Arc<Vec<String>>,
}
impl<S, O> TimerWorker<S, O>
where
S: Store + WorkflowQueryStore,
O: OutboxStore,
{
pub fn new(
runtime: Arc<WorkflowRuntime<S>>,
outbox: O,
config: RuntimeConfig,
worker_id: String,
registered_types: Arc<Vec<String>>,
) -> Self {
Self {
runtime,
outbox,
config,
worker_id,
registered_types,
}
}
pub async fn run(self, mut shutdown: watch::Receiver<bool>) {
let mut poll_interval = interval(self.config.timer_poll_interval);
poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
info!(worker_id = %self.worker_id, "Timer worker started");
loop {
tokio::select! {
_ = poll_interval.tick() => {
if let Err(e) = self.process_one().await {
error!(error = %e, "Error processing timer");
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
info!(worker_id = %self.worker_id, "Timer worker shutting down");
break;
}
}
}
}
}
async fn process_one(&self) -> crate::Result<()> {
let timer = self
.outbox
.claim_timer(
&self.worker_id,
&self.registered_types,
self.config.timer_lock_duration,
self.config.retry_policy.max_attempts,
)
.await?;
let Some(timer) = timer else {
return Ok(()); };
debug!(
timer_id = %timer.id,
workflow = %timer.workflow,
attempt = timer.attempts + 1,
"Processing timer"
);
let timer_id = timer.id;
let attempts = timer.attempts;
let input = timer.payload;
match self
.runtime
.service()
.execute_dynamic(timer.workflow.workflow_type(), &input)
.await
{
Ok(outcome) => {
match outcome {
crate::ExecuteOutcome::Rejected(ref payload) => {
debug!(
timer_id = %timer_id,
rejection = ?payload,
"Timer input rejected by workflow"
);
}
crate::ExecuteOutcome::AlreadyCompleted => {
debug!(
timer_id = %timer_id,
"Timer input dropped — workflow already completed"
);
}
crate::ExecuteOutcome::Accepted { .. } => {}
}
self.outbox
.mark_timer_processed(timer_id, &self.worker_id)
.await?;
debug!(timer_id = %timer_id, "Timer processed successfully");
}
Err(e) => {
let error_msg = format!("Failed to execute timer input: {}", e);
warn!(timer_id = %timer_id, error = %error_msg, "Timer execution failed");
let backoff = self.config.retry_policy.backoff_duration(attempts + 1);
self.outbox
.record_timer_failure(timer_id, &self.worker_id, &error_msg, backoff)
.await?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
}