use async_trait::async_trait;
use chrono::{DateTime, Utc};
use concepts::prefixed_ulid::ExecutionIdDerived;
use concepts::prefixed_ulid::RunId;
use concepts::storage::HistoryEvent;
use concepts::storage::Version;
use concepts::ExecutionId;
use concepts::ExecutionMetadata;
use concepts::FunctionMetadata;
use concepts::PermanentFailureKind;
use concepts::TrapKind;
use concepts::{
storage::{DbError, JoinSetResponseEvent},
FinishedExecutionError, StrVariant,
};
use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
use concepts::{Params, SupportedFunctionReturnValue};
use tracing::Span;
#[async_trait]
pub trait Worker: Send + Sync + 'static {
async fn run(&self, ctx: WorkerContext) -> WorkerResult;
fn exported_functions(&self) -> &[FunctionMetadata];
fn imported_functions(&self) -> &[FunctionMetadata];
}
#[must_use]
#[derive(Debug)]
pub enum WorkerResult {
Ok(SupportedFunctionReturnValue, Version),
DbUpdatedByWorker,
Err(WorkerError),
}
#[derive(Debug)]
pub struct WorkerContext {
pub execution_id: ExecutionId,
pub run_id: RunId,
pub metadata: ExecutionMetadata,
pub ffqn: FunctionFqn,
pub params: Params,
pub event_history: Vec<HistoryEvent>,
pub responses: Vec<JoinSetResponseEvent>,
pub version: Version,
pub execution_deadline: DateTime<Utc>,
pub can_be_retried: bool,
pub worker_span: Span,
}
#[derive(Debug, thiserror::Error)]
pub enum WorkerError {
#[error("activity {trap_kind}: {reason}")]
ActivityTrap {
reason: String,
trap_kind: TrapKind,
detail: String,
version: Version,
},
#[error("activity returned error")]
ActivityReturnedError {
detail: Option<String>,
version: Version,
},
#[error("workflow trap handled as temporary error: {reason}")]
TemporaryWorkflowTrap {
reason: String,
kind: TrapKind,
detail: Option<String>,
version: Version,
},
#[error("limit reached: {reason}")]
LimitReached { reason: String, version: Version },
#[error("temporary timeout")]
TemporaryTimeout,
#[error(transparent)]
DbError(DbError),
#[error("fatal error: {0}")]
FatalError(FatalError, Version),
}
#[derive(Debug, thiserror::Error)]
pub enum FatalError {
#[error("child finished with an execution error: {child_execution_id}")]
UnhandledChildExecutionError {
child_execution_id: ExecutionIdDerived,
root_cause_id: ExecutionIdDerived,
},
#[error("nondeterminism detected")]
NondeterminismDetected { detail: String },
#[error(transparent)]
ParamsParsingError(ParamsParsingError),
#[error("cannot instantiate: {reason}")]
CannotInstantiate { reason: String, detail: String },
#[error(transparent)]
ResultParsingError(ResultParsingError),
#[error("error calling imported function {ffqn} : {reason}")]
ImportedFunctionCallError {
ffqn: FunctionFqn,
reason: StrVariant,
detail: Option<String>,
},
#[error("workflow {trap_kind}: {reason}")]
WorkflowTrap {
reason: String,
trap_kind: TrapKind,
detail: String,
},
#[error("join set already exists with name `{name}`")]
JoinSetNameConflict { name: String },
}
impl From<FatalError> for FinishedExecutionError {
fn from(value: FatalError) -> Self {
let reason_full = value.to_string();
match value {
FatalError::UnhandledChildExecutionError {
child_execution_id,
root_cause_id,
} => FinishedExecutionError::UnhandledChildExecutionError {
child_execution_id,
root_cause_id,
},
FatalError::NondeterminismDetected { detail } => {
FinishedExecutionError::PermanentFailure {
reason_inner: reason_full.clone(),
reason_full,
kind: PermanentFailureKind::NondeterminismDetected,
detail: Some(detail),
}
}
FatalError::ParamsParsingError(params_parsing_error) => {
FinishedExecutionError::PermanentFailure {
reason_inner: reason_full.to_string(),
reason_full,
kind: PermanentFailureKind::ParamsParsingError,
detail: params_parsing_error.detail(),
}
}
FatalError::CannotInstantiate {
detail,
reason: reason_inner,
..
} => FinishedExecutionError::PermanentFailure {
reason_inner,
reason_full,
kind: PermanentFailureKind::CannotInstantiate,
detail: Some(detail),
},
FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
reason_inner: reason_full.to_string(),
reason_full,
kind: PermanentFailureKind::ResultParsingError,
detail: None,
},
FatalError::ImportedFunctionCallError {
detail,
reason: reason_inner,
..
} => FinishedExecutionError::PermanentFailure {
reason_inner: reason_inner.to_string(),
reason_full,
kind: PermanentFailureKind::ImportedFunctionCallError,
detail,
},
FatalError::WorkflowTrap {
detail,
reason: reason_inner,
..
} => FinishedExecutionError::PermanentFailure {
reason_inner,
reason_full,
kind: PermanentFailureKind::WorkflowTrap,
detail: Some(detail),
},
FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
reason_inner: name,
reason_full,
kind: PermanentFailureKind::JoinSetNameConflict,
detail: None,
},
}
}
}