1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::ExecutionIdDerived;
9use concepts::prefixed_ulid::RunId;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{
14 FinishedExecutionError, StrVariant,
15 storage::{DbError, JoinSetResponseEvent},
16};
17use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
18use concepts::{Params, SupportedFunctionReturnValue};
19use tracing::Span;
20
21#[async_trait]
22pub trait Worker: Send + Sync + 'static {
23 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
24
25 fn exported_functions(&self) -> &[FunctionMetadata];
29}
30
31#[must_use]
32#[derive(Debug)]
33pub enum WorkerResult {
34 Ok(
35 SupportedFunctionReturnValue,
36 Version,
37 Option<Vec<HttpClientTrace>>,
38 ),
39 DbUpdatedByWorkerOrWatcher,
41 Err(WorkerError),
42}
43
44#[derive(Debug)]
45pub struct WorkerContext {
46 pub execution_id: ExecutionId,
47 pub run_id: RunId,
48 pub metadata: ExecutionMetadata,
49 pub ffqn: FunctionFqn,
50 pub params: Params,
51 pub event_history: Vec<HistoryEvent>,
52 pub responses: Vec<JoinSetResponseEvent>,
53 pub version: Version,
54 pub execution_deadline: DateTime<Utc>,
55 pub can_be_retried: bool,
56 pub worker_span: Span,
57}
58
59#[derive(Debug, thiserror::Error)]
60pub enum WorkerError {
61 #[error("activity {trap_kind}: {reason}")]
64 ActivityTrap {
65 reason: String,
66 trap_kind: TrapKind,
67 detail: Option<String>,
68 version: Version,
69 http_client_traces: Option<Vec<HttpClientTrace>>,
70 },
71 #[error("{reason_kind}")]
72 ActivityPreopenedDirError {
73 reason_kind: &'static str,
74 reason_inner: String,
75 version: Version,
76 },
77 #[error("activity returned error")]
79 ActivityReturnedError {
80 detail: Option<String>,
81 version: Version,
82 http_client_traces: Option<Vec<HttpClientTrace>>,
83 },
84 #[error("limit reached: {reason}")]
86 LimitReached { reason: String, version: Version },
87 #[error("temporary timeout")]
89 TemporaryTimeout {
90 http_client_traces: Option<Vec<HttpClientTrace>>,
91 version: Version,
92 },
93 #[error(transparent)]
94 DbError(DbError),
95 #[error("fatal error: {0}")]
97 FatalError(FatalError, Version),
98}
99
100#[derive(Debug, thiserror::Error)]
101pub enum FatalError {
102 #[error("child finished with an execution error: {child_execution_id}")]
104 UnhandledChildExecutionError {
105 child_execution_id: ExecutionIdDerived,
106 root_cause_id: ExecutionIdDerived,
107 },
108 #[error("nondeterminism detected")]
110 NondeterminismDetected { detail: String },
111 #[error(transparent)]
113 ParamsParsingError(ParamsParsingError),
114 #[error("cannot instantiate: {reason}")]
116 CannotInstantiate { reason: String, detail: String },
117 #[error(transparent)]
119 ResultParsingError(ResultParsingError),
120 #[error("error calling imported function {ffqn} : {reason}")]
122 ImportedFunctionCallError {
123 ffqn: FunctionFqn,
124 reason: StrVariant,
125 detail: Option<String>,
126 },
127 #[error("workflow {trap_kind}: {reason}")]
129 WorkflowTrap {
130 reason: String,
131 trap_kind: TrapKind,
132 detail: Option<String>,
133 },
134 #[error("out of fuel: {reason}")]
135 OutOfFuel { reason: String },
136}
137
138impl From<FatalError> for FinishedExecutionError {
139 fn from(value: FatalError) -> Self {
140 let reason_full = value.to_string();
141 match value {
142 FatalError::UnhandledChildExecutionError {
143 child_execution_id,
144 root_cause_id,
145 } => FinishedExecutionError::UnhandledChildExecutionError {
146 child_execution_id,
147 root_cause_id,
148 },
149 FatalError::NondeterminismDetected { detail } => {
150 FinishedExecutionError::PermanentFailure {
151 reason_inner: reason_full.clone(),
152 reason_full,
153 kind: PermanentFailureKind::NondeterminismDetected,
154 detail: Some(detail),
155 }
156 }
157 FatalError::ParamsParsingError(params_parsing_error) => {
158 FinishedExecutionError::PermanentFailure {
159 reason_inner: reason_full.to_string(),
160 reason_full,
161 kind: PermanentFailureKind::ParamsParsingError,
162 detail: params_parsing_error.detail(),
163 }
164 }
165 FatalError::CannotInstantiate {
166 detail,
167 reason: reason_inner,
168 ..
169 } => FinishedExecutionError::PermanentFailure {
170 reason_inner,
171 reason_full,
172 kind: PermanentFailureKind::CannotInstantiate,
173 detail: Some(detail),
174 },
175 FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
176 reason_inner: reason_full.to_string(),
177 reason_full,
178 kind: PermanentFailureKind::ResultParsingError,
179 detail: None,
180 },
181 FatalError::ImportedFunctionCallError {
182 detail,
183 reason: reason_inner,
184 ..
185 } => FinishedExecutionError::PermanentFailure {
186 reason_inner: reason_inner.to_string(),
187 reason_full,
188 kind: PermanentFailureKind::ImportedFunctionCallError,
189 detail,
190 },
191 FatalError::WorkflowTrap {
192 detail,
193 reason: reason_inner,
194 ..
195 } => FinishedExecutionError::PermanentFailure {
196 reason_inner,
197 reason_full,
198 kind: PermanentFailureKind::WorkflowTrap,
199 detail,
200 },
201 FatalError::OutOfFuel {
202 reason: reason_inner,
203 } => FinishedExecutionError::PermanentFailure {
204 reason_inner,
205 reason_full,
206 kind: PermanentFailureKind::OutOfFuel,
207 detail: None,
208 },
209 }
210 }
211}