use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
use tokio::sync::watch;
use tracing::info;
use uuid::Uuid;
use super::config::RuntimeConfig;
use super::effect_worker::EffectWorker;
use super::timer_worker::TimerWorker;
use crate::Workflow;
use crate::effect::{EffectContext, EffectHandler};
use crate::engine::WorkflowEngine;
use crate::error::Error;
use crate::service::{ExecuteOutcome, WorkflowService, WorkflowServiceConfig};
use crate::store::{DeadLetter, DeadLetterQuery, OutboxStore, Store, WorkflowQueryStore};
fn extract_panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else {
"<non-string panic payload>".to_string()
}
}
fn log_worker_termination(
runtime_worker_id: &str,
worker_name: &str,
result: std::result::Result<(), tokio::task::JoinError>,
) {
match result {
Ok(()) => {}
Err(e) if e.is_panic() => {
let panic_msg = match e.try_into_panic() {
Ok(payload) => extract_panic_message(payload),
Err(_) => "<join error, not a panic>".to_string(),
};
tracing::error!(
worker_id = %runtime_worker_id,
worker = %worker_name,
panic = %panic_msg,
"Worker task panicked — processing for this worker has stopped"
);
}
Err(e) => {
tracing::warn!(
worker_id = %runtime_worker_id,
worker = %worker_name,
error = ?e,
"Worker task ended unexpectedly (cancelled)"
);
}
}
}
#[async_trait]
pub(crate) trait TypedEntry<W: Workflow>: Send + Sync + 'static {
async fn execute(&self, input: &W::Input) -> crate::Result<ExecuteOutcome<W::Rejection>>;
async fn replay_latest_state(
&self,
workflow_id: &crate::workflow::WorkflowId,
) -> crate::Result<W::State>;
}
#[async_trait]
pub(crate) trait DynamicEntry: Send + Sync {
async fn execute_dynamic(
&self,
input_json: Value,
) -> crate::Result<crate::ExecuteOutcome<Value>>;
async fn handle_effect(
&self,
effect_json: Value,
ctx: &EffectContext,
) -> Result<Option<Value>, String>;
async fn replay_latest_state_dynamic(
&self,
workflow_id: &crate::workflow::WorkflowId,
) -> crate::Result<Value>;
}
struct WorkflowEntry<W, H, S>
where
W: Workflow,
H: EffectHandler<Workflow = W>,
S: Store,
{
store: S,
record_input_observations: bool,
handler: H,
_marker: PhantomData<W>,
}
#[async_trait]
impl<W, H, S> DynamicEntry for WorkflowEntry<W, H, S>
where
W: Workflow + Send + Sync + 'static,
W::State: Send + Serialize,
W::Input: Serialize + DeserializeOwned + Send + Sync,
W::Effect: DeserializeOwned,
H: EffectHandler<Workflow = W>,
S: Store + WorkflowQueryStore,
{
async fn execute_dynamic(
&self,
input_json: Value,
) -> crate::Result<crate::ExecuteOutcome<Value>> {
let input: W::Input = serde_json::from_value(input_json)?;
let outcome = self.execute(&input).await?;
outcome
.try_map(|rejection| serde_json::to_value(&rejection))
.map_err(crate::Error::from)
}
async fn handle_effect(
&self,
effect_json: Value,
ctx: &EffectContext,
) -> Result<Option<Value>, String> {
let effect: W::Effect = serde_json::from_value(effect_json).map_err(|e| e.to_string())?;
let input = self
.handler
.handle(&effect, ctx)
.await
.map_err(|e| e.to_string())?;
match input {
Some(i) => {
let json = serde_json::to_value(i).map_err(|e| e.to_string())?;
Ok(Some(json))
}
None => Ok(None),
}
}
async fn replay_latest_state_dynamic(
&self,
workflow_id: &crate::workflow::WorkflowId,
) -> crate::Result<Value> {
let state = self.replay_latest_state(workflow_id).await?;
Ok(serde_json::to_value(state)?)
}
}
#[async_trait]
impl<W, H, S> TypedEntry<W> for WorkflowEntry<W, H, S>
where
W: Workflow + Send + Sync + 'static,
W::State: Send,
W::Input: Send + Sync,
W::Rejection: Send,
H: EffectHandler<Workflow = W>,
S: Store + WorkflowQueryStore,
{
async fn execute(&self, input: &W::Input) -> crate::Result<ExecuteOutcome<W::Rejection>> {
crate::decider::execute::<W, _>(&self.store, self.record_input_observations, input).await
}
async fn replay_latest_state(
&self,
workflow_id: &crate::workflow::WorkflowId,
) -> crate::Result<W::State> {
let events = self
.store
.fetch_workflow_events(W::TYPE, workflow_id)
.await?;
let mut state = W::State::default();
for event in events {
let sequence = event.sequence;
let typed: W::Event = serde_json::from_value(event.payload).map_err(|e| {
crate::Error::event_deserialization(W::TYPE, workflow_id.as_str(), sequence, e)
})?;
state = W::evolve(state, typed);
}
Ok(state)
}
}
pub(crate) struct WorkflowRegistry {
entries: HashMap<&'static str, Arc<dyn DynamicEntry>>,
typed_entries: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
}
impl WorkflowRegistry {
fn new() -> Self {
Self {
entries: HashMap::new(),
typed_entries: HashMap::new(),
}
}
fn register<W, H, S>(&mut self, store: S, record_input_observations: bool, handler: H)
where
W: Workflow + Send + Sync + 'static,
W::State: Send + Serialize,
W::Input: Serialize + DeserializeOwned + Send + Sync,
W::Effect: DeserializeOwned,
W::Rejection: Send,
H: EffectHandler<Workflow = W>,
S: Store + WorkflowQueryStore,
{
let entry = Arc::new(WorkflowEntry {
store,
record_input_observations,
handler,
_marker: PhantomData,
});
let dyn_entry: Arc<dyn DynamicEntry> = entry.clone();
self.entries.insert(W::TYPE, dyn_entry);
let typed_entry: Arc<dyn TypedEntry<W>> = entry;
self.typed_entries
.insert(TypeId::of::<W>(), Box::new(typed_entry));
}
pub(crate) fn get(&self, workflow_type: &str) -> Option<(&'static str, &dyn DynamicEntry)> {
self.entries
.get_key_value(workflow_type)
.map(|(k, v)| (*k, v.as_ref()))
}
pub(crate) fn get_typed<W>(&self) -> Option<Arc<dyn TypedEntry<W>>>
where
W: Workflow + 'static,
{
self.typed_entries
.get(&TypeId::of::<W>())
.and_then(|any| any.downcast_ref::<Arc<dyn TypedEntry<W>>>())
.cloned()
}
pub(crate) fn len(&self) -> usize {
self.entries.len()
}
pub(crate) fn registered_types(&self) -> Vec<String> {
self.entries.keys().map(|k| (*k).to_string()).collect()
}
}
pub struct WorkflowBuilder<S>
where
S: Store + WorkflowQueryStore,
{
store: S,
registry: WorkflowRegistry,
duplicate_workflow_type: Option<String>,
config: RuntimeConfig,
service_config: WorkflowServiceConfig,
}
impl<S> WorkflowBuilder<S>
where
S: Store + WorkflowQueryStore,
{
fn new(store: S, service_config: WorkflowServiceConfig) -> Self {
Self {
store,
registry: WorkflowRegistry::new(),
duplicate_workflow_type: None,
config: RuntimeConfig::default(),
service_config,
}
}
pub fn register<H>(mut self, handler: H) -> Self
where
H: EffectHandler,
H::Workflow: Workflow + Send + Sync + 'static,
<H::Workflow as Workflow>::State: Send + Serialize,
<H::Workflow as Workflow>::Input: Serialize + DeserializeOwned + Send + Sync,
<H::Workflow as Workflow>::Effect: DeserializeOwned,
<H::Workflow as Workflow>::Rejection: Send,
{
if self.registry.entries.contains_key(H::Workflow::TYPE) {
if self.duplicate_workflow_type.is_none() {
self.duplicate_workflow_type = Some(H::Workflow::TYPE.to_string());
}
return self;
}
self.registry.register::<H::Workflow, _, _>(
self.store.clone(),
self.service_config.record_input_observations,
handler,
);
self
}
pub fn register_without_effects<W>(self) -> Self
where
W: Workflow<Effect = ()> + Send + Sync + 'static,
W::State: Send + Serialize,
W::Input: Serialize + DeserializeOwned + Send + Sync,
W::Effect: DeserializeOwned,
W::Rejection: Send,
{
self.register(crate::effect::handler::NoopHandler::<W>::default())
}
pub fn config(mut self, config: RuntimeConfig) -> Self {
self.config = config;
self
}
pub fn build_engine(self) -> crate::Result<WorkflowEngine<S>> {
let (service, runtime) = self.build_parts()?;
Ok(WorkflowEngine { service, runtime })
}
pub fn build_runtime(self) -> crate::Result<WorkflowRuntime<S>> {
let (_service, runtime) = self.build_parts()?;
Ok(runtime)
}
pub fn build_service(self) -> crate::Result<WorkflowService<S>> {
if let Some(workflow_type) = self.duplicate_workflow_type {
return Err(Error::DuplicateWorkflowType(workflow_type));
}
let registry = Arc::new(self.registry);
Ok(WorkflowService::new(
self.store,
registry,
self.service_config,
))
}
fn build_parts(self) -> crate::Result<(Arc<WorkflowService<S>>, WorkflowRuntime<S>)> {
if let Some(workflow_type) = self.duplicate_workflow_type {
return Err(Error::DuplicateWorkflowType(workflow_type));
}
let worker_id = self
.config
.worker_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let registry = Arc::new(self.registry);
let service = Arc::new(WorkflowService::new(
self.store.clone(),
Arc::clone(®istry),
self.service_config,
));
let runtime = WorkflowRuntime {
store: self.store,
service,
config: self.config,
worker_id,
};
Ok((Arc::clone(&runtime.service), runtime))
}
}
#[derive(Clone)]
pub struct WorkflowRuntime<S>
where
S: Store + WorkflowQueryStore,
{
store: S,
service: Arc<WorkflowService<S>>,
config: RuntimeConfig,
worker_id: String,
}
impl<S> WorkflowRuntime<S>
where
S: Store + WorkflowQueryStore,
{
pub fn builder(store: S, service_config: WorkflowServiceConfig) -> WorkflowBuilder<S> {
WorkflowBuilder::new(store, service_config)
}
pub fn config(&self) -> &RuntimeConfig {
&self.config
}
pub fn worker_id(&self) -> &str {
&self.worker_id
}
pub fn workflow_count(&self) -> usize {
self.service.workflow_count()
}
pub(crate) fn service(&self) -> &WorkflowService<S> {
&self.service
}
}
impl<S> WorkflowRuntime<S>
where
S: Store + WorkflowQueryStore + OutboxStore,
{
pub async fn run<F>(self, shutdown: F) -> crate::Result<()>
where
F: Future<Output = ()> + Send,
{
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let effect_worker_count = self.config.effect_workers.max(1);
let timer_worker_count = self.config.timer_workers.max(1);
info!(
worker_id = %self.worker_id,
workflows = self.workflow_count(),
effect_workers = effect_worker_count,
timer_workers = timer_worker_count,
"Runtime starting"
);
let runtime = Arc::new(self);
let registered_types = Arc::new(runtime.service.registered_types());
if registered_types.is_empty() {
tracing::warn!(
worker_id = %runtime.worker_id,
"Runtime started with no registered workflows — workers will not claim any effects or timers"
);
}
let mut supervisors: Vec<tokio::task::JoinHandle<()>> = Vec::new();
for i in 0..effect_worker_count {
let worker_name = if effect_worker_count == 1 {
format!("{}-effect", runtime.worker_id)
} else {
format!("{}-effect-{}", runtime.worker_id, i)
};
let effect_worker = EffectWorker::new(
Arc::clone(&runtime),
runtime.store.clone(),
runtime.config.clone(),
worker_name.clone(),
Arc::clone(®istered_types),
);
let effect_shutdown_rx = shutdown_rx.clone();
let runtime_worker_id = runtime.worker_id.clone();
supervisors.push(tokio::spawn(async move {
let inner = tokio::spawn(async move {
effect_worker.run(effect_shutdown_rx).await;
});
log_worker_termination(&runtime_worker_id, &worker_name, inner.await);
}));
}
for i in 0..timer_worker_count {
let worker_name = if timer_worker_count == 1 {
format!("{}-timer", runtime.worker_id)
} else {
format!("{}-timer-{}", runtime.worker_id, i)
};
let timer_worker = TimerWorker::new(
Arc::clone(&runtime),
runtime.store.clone(),
runtime.config.clone(),
worker_name.clone(),
Arc::clone(®istered_types),
);
let timer_shutdown_rx = shutdown_rx.clone();
let runtime_worker_id = runtime.worker_id.clone();
supervisors.push(tokio::spawn(async move {
let inner = tokio::spawn(async move {
timer_worker.run(timer_shutdown_rx).await;
});
log_worker_termination(&runtime_worker_id, &worker_name, inner.await);
}));
}
shutdown.await;
let _ = shutdown_tx.send(true);
let shutdown_timeout = runtime.config.shutdown_timeout;
let all_workers = async move {
for supervisor in supervisors {
let _ = supervisor.await;
}
};
match tokio::time::timeout(shutdown_timeout, all_workers).await {
Ok(()) => {
info!(worker_id = %runtime.worker_id, "Runtime stopped gracefully");
}
Err(_) => {
tracing::warn!(
worker_id = %runtime.worker_id,
timeout_secs = shutdown_timeout.as_secs(),
"Shutdown timeout exceeded, forcing stop"
);
}
}
Ok(())
}
pub async fn fetch_dead_letters(
&self,
query: DeadLetterQuery,
) -> crate::Result<Vec<DeadLetter>> {
self.store
.fetch_dead_letters(&query, self.config.retry_policy.max_attempts)
.await
}
pub async fn count_dead_letters(&self, query: DeadLetterQuery) -> crate::Result<u64> {
self.store
.count_dead_letters(&query, self.config.retry_policy.max_attempts)
.await
}
pub async fn retry_dead_letter(&self, effect_id: Uuid) -> crate::Result<bool> {
self.store.retry_dead_letter(effect_id).await
}
pub async fn fetch_timer_dead_letters(
&self,
query: DeadLetterQuery,
) -> crate::Result<Vec<DeadLetter>> {
self.store
.fetch_timer_dead_letters(&query, self.config.retry_policy.max_attempts)
.await
}
pub async fn count_timer_dead_letters(&self, query: DeadLetterQuery) -> crate::Result<u64> {
self.store
.count_timer_dead_letters(&query, self.config.retry_policy.max_attempts)
.await
}
pub async fn retry_timer_dead_letter(&self, timer_id: Uuid) -> crate::Result<bool> {
self.store.retry_timer_dead_letter(timer_id).await
}
}