1use async_trait::async_trait;
2use concepts::ExecutionId;
3use concepts::ExecutionMetadata;
4use concepts::FunctionMetadata;
5use concepts::PermanentFailureKind;
6use concepts::TrapKind;
7use concepts::storage::DbErrorWrite;
8use concepts::storage::HistoryEvent;
9use concepts::storage::Locked;
10use concepts::storage::Version;
11use concepts::storage::http_client_trace::HttpClientTrace;
12use concepts::{FinishedExecutionError, StrVariant, storage::JoinSetResponseEvent};
13use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
14use concepts::{Params, SupportedFunctionReturnValue};
15use tracing::Span;
16
17#[async_trait]
18pub trait Worker: Send + Sync + 'static {
19 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
20
21 fn exported_functions(&self) -> &[FunctionMetadata];
25}
26
27#[must_use]
28#[derive(Debug)]
29pub enum WorkerResult {
30 Ok(
31 SupportedFunctionReturnValue,
32 Version,
33 Option<Vec<HttpClientTrace>>,
34 ),
35 DbUpdatedByWorkerOrWatcher,
37 Err(WorkerError),
38}
39
40#[derive(Debug)]
41pub struct WorkerContext {
42 pub execution_id: ExecutionId,
43 pub metadata: ExecutionMetadata,
44 pub ffqn: FunctionFqn,
45 pub params: Params,
46 pub event_history: Vec<HistoryEvent>,
47 pub responses: Vec<JoinSetResponseEvent>,
48 pub version: Version,
49 pub can_be_retried: bool,
50 pub worker_span: Span,
51 pub locked_event: Locked,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum WorkerError {
56 #[error("activity {trap_kind}: {reason}")]
59 ActivityTrap {
60 reason: String,
61 trap_kind: TrapKind,
62 detail: Option<String>,
63 version: Version,
64 http_client_traces: Option<Vec<HttpClientTrace>>,
65 },
66 #[error("{reason_kind}")]
67 ActivityPreopenedDirError {
68 reason_kind: &'static str,
69 reason_inner: String,
70 version: Version,
71 },
72 #[error("activity returned error")]
74 ActivityReturnedError {
75 detail: Option<String>,
76 version: Version,
77 http_client_traces: Option<Vec<HttpClientTrace>>,
78 },
79 #[error("limit reached: {reason}")]
81 LimitReached { reason: String, version: Version },
82 #[error("temporary timeout")]
84 TemporaryTimeout {
85 http_client_traces: Option<Vec<HttpClientTrace>>,
86 version: Version,
87 },
88 #[error(transparent)]
89 DbError(DbErrorWrite),
90 #[error("fatal error: {0}")]
92 FatalError(FatalError, Version),
93}
94
95#[derive(Debug, thiserror::Error)]
96pub enum FatalError {
97 #[error("nondeterminism detected")]
99 NondeterminismDetected { detail: String },
100 #[error(transparent)]
102 ParamsParsingError(ParamsParsingError),
103 #[error("cannot instantiate: {reason}")]
105 CannotInstantiate { reason: String, detail: String },
106 #[error(transparent)]
108 ResultParsingError(ResultParsingError),
109 #[error("error calling imported function {ffqn} : {reason}")]
111 ImportedFunctionCallError {
112 ffqn: FunctionFqn,
113 reason: StrVariant,
114 detail: Option<String>,
115 },
116 #[error("workflow {trap_kind}: {reason}")]
118 WorkflowTrap {
119 reason: String,
120 trap_kind: TrapKind,
121 detail: Option<String>,
122 },
123 #[error("out of fuel: {reason}")]
124 OutOfFuel { reason: String },
125}
126
127impl From<FatalError> for FinishedExecutionError {
128 fn from(value: FatalError) -> Self {
129 let reason_full = value.to_string();
130 match value {
131 FatalError::NondeterminismDetected { detail } => {
132 FinishedExecutionError::PermanentFailure {
133 reason_inner: reason_full.clone(),
134 reason_full,
135 kind: PermanentFailureKind::NondeterminismDetected,
136 detail: Some(detail),
137 }
138 }
139 FatalError::ParamsParsingError(params_parsing_error) => {
140 FinishedExecutionError::PermanentFailure {
141 reason_inner: reason_full.clone(),
142 reason_full,
143 kind: PermanentFailureKind::ParamsParsingError,
144 detail: params_parsing_error.detail(),
145 }
146 }
147 FatalError::CannotInstantiate {
148 detail,
149 reason: reason_inner,
150 ..
151 } => FinishedExecutionError::PermanentFailure {
152 reason_inner,
153 reason_full,
154 kind: PermanentFailureKind::CannotInstantiate,
155 detail: Some(detail),
156 },
157 FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
158 reason_inner: reason_full.clone(),
159 reason_full,
160 kind: PermanentFailureKind::ResultParsingError,
161 detail: None,
162 },
163 FatalError::ImportedFunctionCallError {
164 detail,
165 reason: reason_inner,
166 ..
167 } => FinishedExecutionError::PermanentFailure {
168 reason_inner: reason_inner.to_string(),
169 reason_full,
170 kind: PermanentFailureKind::ImportedFunctionCallError,
171 detail,
172 },
173 FatalError::WorkflowTrap {
174 detail,
175 reason: reason_inner,
176 ..
177 } => FinishedExecutionError::PermanentFailure {
178 reason_inner,
179 reason_full,
180 kind: PermanentFailureKind::WorkflowTrap,
181 detail,
182 },
183 FatalError::OutOfFuel {
184 reason: reason_inner,
185 } => FinishedExecutionError::PermanentFailure {
186 reason_inner,
187 reason_full,
188 kind: PermanentFailureKind::OutOfFuel,
189 detail: None,
190 },
191 }
192 }
193}