1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::RunId;
9use concepts::storage::DbErrorWrite;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{FinishedExecutionError, StrVariant, storage::JoinSetResponseEvent};
14use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
15use concepts::{Params, SupportedFunctionReturnValue};
16use tracing::Span;
17
18#[async_trait]
19pub trait Worker: Send + Sync + 'static {
20 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
21
22 fn exported_functions(&self) -> &[FunctionMetadata];
26}
27
28#[must_use]
29#[derive(Debug)]
30pub enum WorkerResult {
31 Ok(
32 SupportedFunctionReturnValue,
33 Version,
34 Option<Vec<HttpClientTrace>>,
35 ),
36 DbUpdatedByWorkerOrWatcher,
38 Err(WorkerError),
39}
40
41#[derive(Debug)]
42pub struct WorkerContext {
43 pub execution_id: ExecutionId,
44 pub run_id: RunId,
45 pub metadata: ExecutionMetadata,
46 pub ffqn: FunctionFqn,
47 pub params: Params,
48 pub event_history: Vec<HistoryEvent>,
49 pub responses: Vec<JoinSetResponseEvent>,
50 pub version: Version,
51 pub execution_deadline: DateTime<Utc>,
52 pub can_be_retried: bool,
53 pub worker_span: Span,
54}
55
56#[derive(Debug, thiserror::Error)]
57pub enum WorkerError {
58 #[error("activity {trap_kind}: {reason}")]
61 ActivityTrap {
62 reason: String,
63 trap_kind: TrapKind,
64 detail: Option<String>,
65 version: Version,
66 http_client_traces: Option<Vec<HttpClientTrace>>,
67 },
68 #[error("{reason_kind}")]
69 ActivityPreopenedDirError {
70 reason_kind: &'static str,
71 reason_inner: String,
72 version: Version,
73 },
74 #[error("activity returned error")]
76 ActivityReturnedError {
77 detail: Option<String>,
78 version: Version,
79 http_client_traces: Option<Vec<HttpClientTrace>>,
80 },
81 #[error("limit reached: {reason}")]
83 LimitReached { reason: String, version: Version },
84 #[error("temporary timeout")]
86 TemporaryTimeout {
87 http_client_traces: Option<Vec<HttpClientTrace>>,
88 version: Version,
89 },
90 #[error(transparent)]
91 DbError(DbErrorWrite),
92 #[error("fatal error: {0}")]
94 FatalError(FatalError, Version),
95}
96
97#[derive(Debug, thiserror::Error)]
98pub enum FatalError {
99 #[error("nondeterminism detected")]
101 NondeterminismDetected { detail: String },
102 #[error(transparent)]
104 ParamsParsingError(ParamsParsingError),
105 #[error("cannot instantiate: {reason}")]
107 CannotInstantiate { reason: String, detail: String },
108 #[error(transparent)]
110 ResultParsingError(ResultParsingError),
111 #[error("error calling imported function {ffqn} : {reason}")]
113 ImportedFunctionCallError {
114 ffqn: FunctionFqn,
115 reason: StrVariant,
116 detail: Option<String>,
117 },
118 #[error("workflow {trap_kind}: {reason}")]
120 WorkflowTrap {
121 reason: String,
122 trap_kind: TrapKind,
123 detail: Option<String>,
124 },
125 #[error("out of fuel: {reason}")]
126 OutOfFuel { reason: String },
127}
128
129impl From<FatalError> for FinishedExecutionError {
130 fn from(value: FatalError) -> Self {
131 let reason_full = value.to_string();
132 match value {
133 FatalError::NondeterminismDetected { detail } => {
134 FinishedExecutionError::PermanentFailure {
135 reason_inner: reason_full.clone(),
136 reason_full,
137 kind: PermanentFailureKind::NondeterminismDetected,
138 detail: Some(detail),
139 }
140 }
141 FatalError::ParamsParsingError(params_parsing_error) => {
142 FinishedExecutionError::PermanentFailure {
143 reason_inner: reason_full.clone(),
144 reason_full,
145 kind: PermanentFailureKind::ParamsParsingError,
146 detail: params_parsing_error.detail(),
147 }
148 }
149 FatalError::CannotInstantiate {
150 detail,
151 reason: reason_inner,
152 ..
153 } => FinishedExecutionError::PermanentFailure {
154 reason_inner,
155 reason_full,
156 kind: PermanentFailureKind::CannotInstantiate,
157 detail: Some(detail),
158 },
159 FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
160 reason_inner: reason_full.clone(),
161 reason_full,
162 kind: PermanentFailureKind::ResultParsingError,
163 detail: None,
164 },
165 FatalError::ImportedFunctionCallError {
166 detail,
167 reason: reason_inner,
168 ..
169 } => FinishedExecutionError::PermanentFailure {
170 reason_inner: reason_inner.to_string(),
171 reason_full,
172 kind: PermanentFailureKind::ImportedFunctionCallError,
173 detail,
174 },
175 FatalError::WorkflowTrap {
176 detail,
177 reason: reason_inner,
178 ..
179 } => FinishedExecutionError::PermanentFailure {
180 reason_inner,
181 reason_full,
182 kind: PermanentFailureKind::WorkflowTrap,
183 detail,
184 },
185 FatalError::OutOfFuel {
186 reason: reason_inner,
187 } => FinishedExecutionError::PermanentFailure {
188 reason_inner,
189 reason_full,
190 kind: PermanentFailureKind::OutOfFuel,
191 detail: None,
192 },
193 }
194 }
195}