use std::sync::Arc;
use tokio::sync::watch;
use tokio::time::interval;
use tracing::{debug, error, info, warn};
use super::RuntimeConfig;
use super::registry::WorkflowRuntime;
use crate::effect::EffectContext;
use crate::store::{OutboxEffect, OutboxStore, Store, WorkflowQueryStore};
pub(crate) struct EffectWorker<S, O>
where
S: Store + WorkflowQueryStore,
O: OutboxStore,
{
runtime: Arc<WorkflowRuntime<S>>,
outbox: O,
config: RuntimeConfig,
worker_id: String,
registered_types: Arc<Vec<String>>,
}
impl<S, O> EffectWorker<S, O>
where
S: Store + WorkflowQueryStore,
O: OutboxStore,
{
pub fn new(
runtime: Arc<WorkflowRuntime<S>>,
outbox: O,
config: RuntimeConfig,
worker_id: String,
registered_types: Arc<Vec<String>>,
) -> Self {
Self {
runtime,
outbox,
config,
worker_id,
registered_types,
}
}
pub async fn run(self, mut shutdown: watch::Receiver<bool>) {
let mut poll_interval = interval(self.config.effect_poll_interval);
poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
info!(worker_id = %self.worker_id, "Effect worker started");
loop {
tokio::select! {
_ = poll_interval.tick() => {
if let Err(e) = self.process_one().await {
error!(error = %e, "Error processing effect");
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
info!(worker_id = %self.worker_id, "Effect worker shutting down");
break;
}
}
}
}
}
async fn process_one(&self) -> crate::Result<()> {
let effect = self
.outbox
.claim_effect(
&self.worker_id,
&self.registered_types,
self.config.effect_lock_duration,
self.config.retry_policy.max_attempts,
)
.await?;
let Some(effect) = effect else {
return Ok(()); };
debug!(
effect_id = %effect.id,
workflow = %effect.workflow,
attempt = effect.attempts + 1,
"Processing effect"
);
let Some((_, entry)) = self
.runtime
.service()
.get_entry(effect.workflow.workflow_type())
else {
let error_msg = format!("Unknown workflow type: {}", effect.workflow.workflow_type());
warn!(effect_id = %effect.id, error = %error_msg, "Dead letter: unknown workflow type");
self.outbox
.record_permanent_failure(
effect.id,
&self.worker_id,
&error_msg,
self.config.retry_policy.max_attempts,
)
.await?;
return Ok(());
};
let ctx = EffectContext::new(
effect.id,
effect.workflow.clone(),
effect.attempts + 1,
effect.created_at,
);
let result = entry.handle_effect(effect.payload.clone(), &ctx).await;
match result {
Ok(maybe_input) => {
if let Some(input_json) = maybe_input {
debug!(
effect_id = %effect.id,
workflow = %effect.workflow,
"Routing effect result as new input"
);
match self
.runtime
.service()
.execute_dynamic(effect.workflow.workflow_type(), &input_json)
.await
{
Ok(outcome) => {
match outcome {
crate::ExecuteOutcome::Rejected(ref payload) => {
debug!(
effect_id = %effect.id,
rejection = ?payload,
"Routed input rejected by workflow"
);
}
crate::ExecuteOutcome::AlreadyCompleted => {
debug!(
effect_id = %effect.id,
"Routed input dropped — workflow already completed"
);
}
crate::ExecuteOutcome::Accepted { .. } => {}
}
}
Err(e) => {
let error_msg = format!("Failed to route result: {}", e);
warn!(effect_id = %effect.id, error = %error_msg, "Result routing failed");
self.record_failure_with_backoff(&effect, &error_msg)
.await?;
return Ok(());
}
}
}
self.outbox
.mark_processed(effect.id, &self.worker_id)
.await?;
debug!(effect_id = %effect.id, "Effect processed successfully");
}
Err(effect_error) => {
let error_msg = effect_error.to_string();
let new_attempts = effect.attempts + 1;
if new_attempts >= self.config.retry_policy.max_attempts {
warn!(
effect_id = %effect.id,
error = %error_msg,
attempts = new_attempts,
max_attempts = self.config.retry_policy.max_attempts,
"Effect exceeded max retries, moving to dead letter"
);
self.outbox
.record_permanent_failure(
effect.id,
&self.worker_id,
&error_msg,
self.config.retry_policy.max_attempts,
)
.await?;
} else {
debug!(
effect_id = %effect.id,
error = %error_msg,
attempts = new_attempts,
"Effect failed, will retry"
);
self.record_failure_with_backoff(&effect, &error_msg)
.await?;
}
}
}
Ok(())
}
async fn record_failure_with_backoff(
&self,
effect: &OutboxEffect,
error: &str,
) -> crate::Result<()> {
let backoff = self
.config
.retry_policy
.backoff_duration(effect.attempts + 1);
self.outbox
.record_failure(effect.id, &self.worker_id, error, backoff)
.await
}
}
#[cfg(test)]
mod tests {
}