1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::ExecutionIdDerived;
9use concepts::prefixed_ulid::RunId;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{
14 FinishedExecutionError, StrVariant,
15 storage::{DbError, JoinSetResponseEvent},
16};
17use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
18use concepts::{Params, SupportedFunctionReturnValue};
19use tracing::Span;
20
21#[async_trait]
22pub trait Worker: Send + Sync + 'static {
23 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
24
25 fn exported_functions(&self) -> &[FunctionMetadata];
29}
30
31#[must_use]
32#[derive(Debug)]
33pub enum WorkerResult {
34 Ok(
35 SupportedFunctionReturnValue,
36 Version,
37 Option<Vec<HttpClientTrace>>,
38 ),
39 DbUpdatedByWorkerOrWatcher,
40 Err(WorkerError),
41}
42
43#[derive(Debug)]
44pub struct WorkerContext {
45 pub execution_id: ExecutionId,
46 pub run_id: RunId,
47 pub metadata: ExecutionMetadata,
48 pub ffqn: FunctionFqn,
49 pub params: Params,
50 pub event_history: Vec<HistoryEvent>,
51 pub responses: Vec<JoinSetResponseEvent>,
52 pub version: Version,
53 pub execution_deadline: DateTime<Utc>,
54 pub can_be_retried: bool,
55 pub worker_span: Span,
56}
57
58#[derive(Debug, thiserror::Error)]
59pub enum WorkerError {
60 #[error("activity {trap_kind}: {reason}")]
63 ActivityTrap {
64 reason: String,
65 trap_kind: TrapKind,
66 detail: String,
67 version: Version,
68 http_client_traces: Option<Vec<HttpClientTrace>>,
69 },
70 #[error("activity returned error")]
72 ActivityReturnedError {
73 detail: Option<String>,
74 version: Version,
75 http_client_traces: Option<Vec<HttpClientTrace>>,
76 },
77 #[error("workflow trap handled as temporary error: {reason}")]
79 TemporaryWorkflowTrap {
80 reason: String,
81 kind: TrapKind,
82 detail: Option<String>,
83 version: Version,
84 },
85 #[error("limit reached: {reason}")]
87 LimitReached { reason: String, version: Version },
88 #[error("temporary timeout")]
90 TemporaryTimeout {
91 http_client_traces: Option<Vec<HttpClientTrace>>,
92 version: Version,
93 },
94 #[error(transparent)]
95 DbError(DbError),
96 #[error("fatal error: {0}")]
98 FatalError(FatalError, Version),
99}
100
101#[derive(Debug, thiserror::Error)]
102pub enum FatalError {
103 #[error("child finished with an execution error: {child_execution_id}")]
105 UnhandledChildExecutionError {
106 child_execution_id: ExecutionIdDerived,
107 root_cause_id: ExecutionIdDerived,
108 },
109
110 #[error("nondeterminism detected")]
112 NondeterminismDetected { detail: String },
113 #[error(transparent)]
115 ParamsParsingError(ParamsParsingError),
116 #[error("cannot instantiate: {reason}")]
118 CannotInstantiate { reason: String, detail: String },
119 #[error(transparent)]
121 ResultParsingError(ResultParsingError),
122 #[error("error calling imported function {ffqn} : {reason}")]
124 ImportedFunctionCallError {
125 ffqn: FunctionFqn,
126 reason: StrVariant,
127 detail: Option<String>,
128 },
129
130 #[error("workflow {trap_kind}: {reason}")]
132 WorkflowTrap {
133 reason: String,
134 trap_kind: TrapKind,
135 detail: String,
136 },
137 #[error("join set already exists with name `{name}`")]
139 JoinSetNameConflict { name: String },
140}
141
142impl From<FatalError> for FinishedExecutionError {
143 fn from(value: FatalError) -> Self {
144 let reason_full = value.to_string();
145 match value {
146 FatalError::UnhandledChildExecutionError {
147 child_execution_id,
148 root_cause_id,
149 } => FinishedExecutionError::UnhandledChildExecutionError {
150 child_execution_id,
151 root_cause_id,
152 },
153 FatalError::NondeterminismDetected { detail } => {
154 FinishedExecutionError::PermanentFailure {
155 reason_inner: reason_full.clone(),
156 reason_full,
157 kind: PermanentFailureKind::NondeterminismDetected,
158 detail: Some(detail),
159 }
160 }
161 FatalError::ParamsParsingError(params_parsing_error) => {
162 FinishedExecutionError::PermanentFailure {
163 reason_inner: reason_full.to_string(),
164 reason_full,
165 kind: PermanentFailureKind::ParamsParsingError,
166 detail: params_parsing_error.detail(),
167 }
168 }
169 FatalError::CannotInstantiate {
170 detail,
171 reason: reason_inner,
172 ..
173 } => FinishedExecutionError::PermanentFailure {
174 reason_inner,
175 reason_full,
176 kind: PermanentFailureKind::CannotInstantiate,
177 detail: Some(detail),
178 },
179 FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
180 reason_inner: reason_full.to_string(),
181 reason_full,
182 kind: PermanentFailureKind::ResultParsingError,
183 detail: None,
184 },
185 FatalError::ImportedFunctionCallError {
186 detail,
187 reason: reason_inner,
188 ..
189 } => FinishedExecutionError::PermanentFailure {
190 reason_inner: reason_inner.to_string(),
191 reason_full,
192 kind: PermanentFailureKind::ImportedFunctionCallError,
193 detail,
194 },
195 FatalError::WorkflowTrap {
196 detail,
197 reason: reason_inner,
198 ..
199 } => FinishedExecutionError::PermanentFailure {
200 reason_inner,
201 reason_full,
202 kind: PermanentFailureKind::WorkflowTrap,
203 detail: Some(detail),
204 },
205 FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
206 reason_inner: name,
207 reason_full,
208 kind: PermanentFailureKind::JoinSetNameConflict,
209 detail: None,
210 },
211 }
212 }
213}