1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::ExecutionIdDerived;
9use concepts::prefixed_ulid::RunId;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{
14 FinishedExecutionError, StrVariant,
15 storage::{DbError, JoinSetResponseEvent},
16};
17use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
18use concepts::{Params, SupportedFunctionReturnValue};
19use tracing::Span;
20
21#[async_trait]
22pub trait Worker: Send + Sync + 'static {
23 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
24
25 fn exported_functions(&self) -> &[FunctionMetadata];
29}
30
31#[must_use]
32#[derive(Debug)]
33pub enum WorkerResult {
34 Ok(
35 SupportedFunctionReturnValue,
36 Version,
37 Option<Vec<HttpClientTrace>>,
38 ),
39 DbUpdatedByWorkerOrWatcher,
40 Err(WorkerError),
41}
42
43#[derive(Debug)]
44pub struct WorkerContext {
45 pub execution_id: ExecutionId,
46 pub run_id: RunId,
47 pub metadata: ExecutionMetadata,
48 pub ffqn: FunctionFqn,
49 pub params: Params,
50 pub event_history: Vec<HistoryEvent>,
51 pub responses: Vec<JoinSetResponseEvent>,
52 pub version: Version,
53 pub execution_deadline: DateTime<Utc>,
54 pub can_be_retried: bool,
55 pub worker_span: Span,
56}
57
58#[derive(Debug, thiserror::Error)]
59pub enum WorkerError {
60 #[error("activity {trap_kind}: {reason}")]
63 ActivityTrap {
64 reason: String,
65 trap_kind: TrapKind,
66 detail: String,
67 version: Version,
68 http_client_traces: Option<Vec<HttpClientTrace>>,
69 },
70 #[error("{reason_kind}")]
71 ActivityPreopenedDirError {
72 reason_kind: &'static str,
73 reason_inner: String,
74 version: Version,
75 },
76 #[error("activity returned error")]
78 ActivityReturnedError {
79 detail: Option<String>,
80 version: Version,
81 http_client_traces: Option<Vec<HttpClientTrace>>,
82 },
83 #[error("workflow trap handled as temporary error: {reason}")]
85 TemporaryWorkflowTrap {
86 reason: String,
87 kind: TrapKind,
88 detail: Option<String>,
89 version: Version,
90 },
91 #[error("limit reached: {reason}")]
93 LimitReached { reason: String, version: Version },
94 #[error("temporary timeout")]
96 TemporaryTimeout {
97 http_client_traces: Option<Vec<HttpClientTrace>>,
98 version: Version,
99 },
100 #[error(transparent)]
101 DbError(DbError),
102 #[error("fatal error: {0}")]
104 FatalError(FatalError, Version),
105}
106
107#[derive(Debug, thiserror::Error)]
108pub enum FatalError {
109 #[error("child finished with an execution error: {child_execution_id}")]
111 UnhandledChildExecutionError {
112 child_execution_id: ExecutionIdDerived,
113 root_cause_id: ExecutionIdDerived,
114 },
115
116 #[error("nondeterminism detected")]
118 NondeterminismDetected { detail: String },
119 #[error(transparent)]
121 ParamsParsingError(ParamsParsingError),
122 #[error("cannot instantiate: {reason}")]
124 CannotInstantiate { reason: String, detail: String },
125 #[error(transparent)]
127 ResultParsingError(ResultParsingError),
128 #[error("error calling imported function {ffqn} : {reason}")]
130 ImportedFunctionCallError {
131 ffqn: FunctionFqn,
132 reason: StrVariant,
133 detail: Option<String>,
134 },
135
136 #[error("workflow {trap_kind}: {reason}")]
138 WorkflowTrap {
139 reason: String,
140 trap_kind: TrapKind,
141 detail: String,
142 },
143 #[error("join set already exists with name `{name}`")]
145 JoinSetNameConflict { name: String },
146}
147
148impl From<FatalError> for FinishedExecutionError {
149 fn from(value: FatalError) -> Self {
150 let reason_full = value.to_string();
151 match value {
152 FatalError::UnhandledChildExecutionError {
153 child_execution_id,
154 root_cause_id,
155 } => FinishedExecutionError::UnhandledChildExecutionError {
156 child_execution_id,
157 root_cause_id,
158 },
159 FatalError::NondeterminismDetected { detail } => {
160 FinishedExecutionError::PermanentFailure {
161 reason_inner: reason_full.clone(),
162 reason_full,
163 kind: PermanentFailureKind::NondeterminismDetected,
164 detail: Some(detail),
165 }
166 }
167 FatalError::ParamsParsingError(params_parsing_error) => {
168 FinishedExecutionError::PermanentFailure {
169 reason_inner: reason_full.to_string(),
170 reason_full,
171 kind: PermanentFailureKind::ParamsParsingError,
172 detail: params_parsing_error.detail(),
173 }
174 }
175 FatalError::CannotInstantiate {
176 detail,
177 reason: reason_inner,
178 ..
179 } => FinishedExecutionError::PermanentFailure {
180 reason_inner,
181 reason_full,
182 kind: PermanentFailureKind::CannotInstantiate,
183 detail: Some(detail),
184 },
185 FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
186 reason_inner: reason_full.to_string(),
187 reason_full,
188 kind: PermanentFailureKind::ResultParsingError,
189 detail: None,
190 },
191 FatalError::ImportedFunctionCallError {
192 detail,
193 reason: reason_inner,
194 ..
195 } => FinishedExecutionError::PermanentFailure {
196 reason_inner: reason_inner.to_string(),
197 reason_full,
198 kind: PermanentFailureKind::ImportedFunctionCallError,
199 detail,
200 },
201 FatalError::WorkflowTrap {
202 detail,
203 reason: reason_inner,
204 ..
205 } => FinishedExecutionError::PermanentFailure {
206 reason_inner,
207 reason_full,
208 kind: PermanentFailureKind::WorkflowTrap,
209 detail: Some(detail),
210 },
211 FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
212 reason_inner: name,
213 reason_full,
214 kind: PermanentFailureKind::JoinSetNameConflict,
215 detail: None,
216 },
217 }
218 }
219}