1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::prefixed_ulid::ExecutionIdDerived;
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::HistoryEvent;
6use concepts::storage::Version;
7use concepts::ExecutionId;
8use concepts::ExecutionMetadata;
9use concepts::FunctionMetadata;
10use concepts::PermanentFailureKind;
11use concepts::TrapKind;
12use concepts::{
13 storage::{DbError, JoinSetResponseEvent},
14 FinishedExecutionError, StrVariant,
15};
16use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
17use concepts::{Params, SupportedFunctionReturnValue};
18use tracing::Span;
19
20#[async_trait]
21pub trait Worker: Send + Sync + 'static {
22 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
23
24 fn exported_functions(&self) -> &[FunctionMetadata];
25
26 fn imported_functions(&self) -> &[FunctionMetadata];
27}
28
29#[must_use]
30#[derive(Debug)]
31pub enum WorkerResult {
32 Ok(SupportedFunctionReturnValue, Version),
33 DbUpdatedByWorker,
34 Err(WorkerError),
35}
36
37#[derive(Debug)]
38pub struct WorkerContext {
39 pub execution_id: ExecutionId,
40 pub run_id: RunId,
41 pub metadata: ExecutionMetadata,
42 pub ffqn: FunctionFqn,
43 pub params: Params,
44 pub event_history: Vec<HistoryEvent>,
45 pub responses: Vec<JoinSetResponseEvent>,
46 pub version: Version,
47 pub execution_deadline: DateTime<Utc>,
48 pub can_be_retried: bool,
49 pub worker_span: Span,
50}
51
52#[derive(Debug, thiserror::Error)]
53pub enum WorkerError {
54 #[error("activity {trap_kind}: {reason}")]
57 ActivityTrap {
58 reason: String,
59 trap_kind: TrapKind,
60 detail: String,
61 version: Version,
62 },
63 #[error("activity returned error")]
65 ActivityReturnedError {
66 detail: Option<String>,
67 version: Version,
68 },
69 #[error("workflow trap handled as temporary error: {reason}")]
71 TemporaryWorkflowTrap {
72 reason: String,
73 kind: TrapKind,
74 detail: Option<String>,
75 version: Version,
76 },
77 #[error("limit reached: {reason}")]
79 LimitReached { reason: String, version: Version },
80 #[error("temporary timeout")]
82 TemporaryTimeout,
83 #[error(transparent)]
84 DbError(DbError),
85 #[error("fatal error: {0}")]
87 FatalError(FatalError, Version),
88}
89
90#[derive(Debug, thiserror::Error)]
91pub enum FatalError {
92 #[error("child finished with an execution error: {child_execution_id}")]
94 UnhandledChildExecutionError {
95 child_execution_id: ExecutionIdDerived,
96 root_cause_id: ExecutionIdDerived,
97 },
98
99 #[error("nondeterminism detected")]
101 NondeterminismDetected { detail: String },
102 #[error(transparent)]
104 ParamsParsingError(ParamsParsingError),
105 #[error("cannot instantiate: {reason}")]
107 CannotInstantiate { reason: String, detail: String },
108 #[error(transparent)]
110 ResultParsingError(ResultParsingError),
111 #[error("error calling imported function {ffqn} : {reason}")]
113 ImportedFunctionCallError {
114 ffqn: FunctionFqn,
115 reason: StrVariant,
116 detail: Option<String>,
117 },
118
119 #[error("workflow {trap_kind}: {reason}")]
121 WorkflowTrap {
122 reason: String,
123 trap_kind: TrapKind,
124 detail: String,
125 },
126 #[error("join set already exists with name `{name}`")]
128 JoinSetNameConflict { name: String },
129}
130
131impl From<FatalError> for FinishedExecutionError {
132 fn from(value: FatalError) -> Self {
133 let reason_full = value.to_string();
134 match value {
135 FatalError::UnhandledChildExecutionError {
136 child_execution_id,
137 root_cause_id,
138 } => FinishedExecutionError::UnhandledChildExecutionError {
139 child_execution_id,
140 root_cause_id,
141 },
142 FatalError::NondeterminismDetected { detail } => {
143 FinishedExecutionError::PermanentFailure {
144 reason_inner: reason_full.clone(),
145 reason_full,
146 kind: PermanentFailureKind::NondeterminismDetected,
147 detail: Some(detail),
148 }
149 }
150 FatalError::ParamsParsingError(params_parsing_error) => {
151 FinishedExecutionError::PermanentFailure {
152 reason_inner: reason_full.to_string(),
153 reason_full,
154 kind: PermanentFailureKind::ParamsParsingError,
155 detail: params_parsing_error.detail(),
156 }
157 }
158 FatalError::CannotInstantiate {
159 detail,
160 reason: reason_inner,
161 ..
162 } => FinishedExecutionError::PermanentFailure {
163 reason_inner,
164 reason_full,
165 kind: PermanentFailureKind::CannotInstantiate,
166 detail: Some(detail),
167 },
168 FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
169 reason_inner: reason_full.to_string(),
170 reason_full,
171 kind: PermanentFailureKind::ResultParsingError,
172 detail: None,
173 },
174 FatalError::ImportedFunctionCallError {
175 detail,
176 reason: reason_inner,
177 ..
178 } => FinishedExecutionError::PermanentFailure {
179 reason_inner: reason_inner.to_string(),
180 reason_full,
181 kind: PermanentFailureKind::ImportedFunctionCallError,
182 detail,
183 },
184 FatalError::WorkflowTrap {
185 detail,
186 reason: reason_inner,
187 ..
188 } => FinishedExecutionError::PermanentFailure {
189 reason_inner,
190 reason_full,
191 kind: PermanentFailureKind::WorkflowTrap,
192 detail: Some(detail),
193 },
194 FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
195 reason_inner: name,
196 reason_full,
197 kind: PermanentFailureKind::JoinSetNameConflict,
198 detail: None,
199 },
200 }
201 }
202}