1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::ExecutionIdDerived;
9use concepts::prefixed_ulid::RunId;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{
14 FinishedExecutionError, StrVariant,
15 storage::{DbError, JoinSetResponseEvent},
16};
17use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
18use concepts::{Params, SupportedFunctionReturnValue};
19use tracing::Span;
20
21#[async_trait]
22pub trait Worker: Send + Sync + 'static {
23 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
24
25 fn exported_functions(&self) -> &[FunctionMetadata];
26
27 fn imported_functions(&self) -> &[FunctionMetadata];
28}
29
30#[must_use]
31#[derive(Debug)]
32pub enum WorkerResult {
33 Ok(
34 SupportedFunctionReturnValue,
35 Version,
36 Option<Vec<HttpClientTrace>>,
37 ),
38 DbUpdatedByWorker,
39 Err(WorkerError),
40}
41
42#[derive(Debug)]
43pub struct WorkerContext {
44 pub execution_id: ExecutionId,
45 pub run_id: RunId,
46 pub metadata: ExecutionMetadata,
47 pub ffqn: FunctionFqn,
48 pub params: Params,
49 pub event_history: Vec<HistoryEvent>,
50 pub responses: Vec<JoinSetResponseEvent>,
51 pub version: Version,
52 pub execution_deadline: DateTime<Utc>,
53 pub can_be_retried: bool,
54 pub worker_span: Span,
55}
56
57#[derive(Debug, thiserror::Error)]
58pub enum WorkerError {
59 #[error("activity {trap_kind}: {reason}")]
62 ActivityTrap {
63 reason: String,
64 trap_kind: TrapKind,
65 detail: String,
66 version: Version,
67 http_client_traces: Option<Vec<HttpClientTrace>>,
68 },
69 #[error("activity returned error")]
71 ActivityReturnedError {
72 detail: Option<String>,
73 version: Version,
74 http_client_traces: Option<Vec<HttpClientTrace>>,
75 },
76 #[error("workflow trap handled as temporary error: {reason}")]
78 TemporaryWorkflowTrap {
79 reason: String,
80 kind: TrapKind,
81 detail: Option<String>,
82 version: Version,
83 },
84 #[error("limit reached: {reason}")]
86 LimitReached { reason: String, version: Version },
87 #[error("temporary timeout")]
89 TemporaryTimeout {
90 http_client_traces: Option<Vec<HttpClientTrace>>,
91 version: Version,
92 },
93 #[error("temporary timeout handled by watcher")]
94 TemporaryTimeoutHandledByWatcher,
96 #[error(transparent)]
97 DbError(DbError),
98 #[error("fatal error: {0}")]
100 FatalError(FatalError, Version),
101}
102
103#[derive(Debug, thiserror::Error)]
104pub enum FatalError {
105 #[error("child finished with an execution error: {child_execution_id}")]
107 UnhandledChildExecutionError {
108 child_execution_id: ExecutionIdDerived,
109 root_cause_id: ExecutionIdDerived,
110 },
111
112 #[error("nondeterminism detected")]
114 NondeterminismDetected { detail: String },
115 #[error(transparent)]
117 ParamsParsingError(ParamsParsingError),
118 #[error("cannot instantiate: {reason}")]
120 CannotInstantiate { reason: String, detail: String },
121 #[error(transparent)]
123 ResultParsingError(ResultParsingError),
124 #[error("error calling imported function {ffqn} : {reason}")]
126 ImportedFunctionCallError {
127 ffqn: FunctionFqn,
128 reason: StrVariant,
129 detail: Option<String>,
130 },
131
132 #[error("workflow {trap_kind}: {reason}")]
134 WorkflowTrap {
135 reason: String,
136 trap_kind: TrapKind,
137 detail: String,
138 },
139 #[error("join set already exists with name `{name}`")]
141 JoinSetNameConflict { name: String },
142}
143
144impl From<FatalError> for FinishedExecutionError {
145 fn from(value: FatalError) -> Self {
146 let reason_full = value.to_string();
147 match value {
148 FatalError::UnhandledChildExecutionError {
149 child_execution_id,
150 root_cause_id,
151 } => FinishedExecutionError::UnhandledChildExecutionError {
152 child_execution_id,
153 root_cause_id,
154 },
155 FatalError::NondeterminismDetected { detail } => {
156 FinishedExecutionError::PermanentFailure {
157 reason_inner: reason_full.clone(),
158 reason_full,
159 kind: PermanentFailureKind::NondeterminismDetected,
160 detail: Some(detail),
161 }
162 }
163 FatalError::ParamsParsingError(params_parsing_error) => {
164 FinishedExecutionError::PermanentFailure {
165 reason_inner: reason_full.to_string(),
166 reason_full,
167 kind: PermanentFailureKind::ParamsParsingError,
168 detail: params_parsing_error.detail(),
169 }
170 }
171 FatalError::CannotInstantiate {
172 detail,
173 reason: reason_inner,
174 ..
175 } => FinishedExecutionError::PermanentFailure {
176 reason_inner,
177 reason_full,
178 kind: PermanentFailureKind::CannotInstantiate,
179 detail: Some(detail),
180 },
181 FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
182 reason_inner: reason_full.to_string(),
183 reason_full,
184 kind: PermanentFailureKind::ResultParsingError,
185 detail: None,
186 },
187 FatalError::ImportedFunctionCallError {
188 detail,
189 reason: reason_inner,
190 ..
191 } => FinishedExecutionError::PermanentFailure {
192 reason_inner: reason_inner.to_string(),
193 reason_full,
194 kind: PermanentFailureKind::ImportedFunctionCallError,
195 detail,
196 },
197 FatalError::WorkflowTrap {
198 detail,
199 reason: reason_inner,
200 ..
201 } => FinishedExecutionError::PermanentFailure {
202 reason_inner,
203 reason_full,
204 kind: PermanentFailureKind::WorkflowTrap,
205 detail: Some(detail),
206 },
207 FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
208 reason_inner: name,
209 reason_full,
210 kind: PermanentFailureKind::JoinSetNameConflict,
211 detail: None,
212 },
213 }
214 }
215}