1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::RunId;
9use concepts::storage::HistoryEvent;
10use concepts::storage::Version;
11use concepts::storage::http_client_trace::HttpClientTrace;
12use concepts::{
13 FinishedExecutionError, StrVariant,
14 storage::{DbError, JoinSetResponseEvent},
15};
16use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
17use concepts::{Params, SupportedFunctionReturnValue};
18use tracing::Span;
19
20#[async_trait]
21pub trait Worker: Send + Sync + 'static {
22 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
23
24 fn exported_functions(&self) -> &[FunctionMetadata];
28}
29
30#[must_use]
31#[derive(Debug)]
32pub enum WorkerResult {
33 Ok(
34 SupportedFunctionReturnValue,
35 Version,
36 Option<Vec<HttpClientTrace>>,
37 ),
38 DbUpdatedByWorkerOrWatcher,
40 Err(WorkerError),
41}
42
43#[derive(Debug)]
44pub struct WorkerContext {
45 pub execution_id: ExecutionId,
46 pub run_id: RunId,
47 pub metadata: ExecutionMetadata,
48 pub ffqn: FunctionFqn,
49 pub params: Params,
50 pub event_history: Vec<HistoryEvent>,
51 pub responses: Vec<JoinSetResponseEvent>,
52 pub version: Version,
53 pub execution_deadline: DateTime<Utc>,
54 pub can_be_retried: bool,
55 pub worker_span: Span,
56}
57
58#[derive(Debug, thiserror::Error)]
59pub enum WorkerError {
60 #[error("activity {trap_kind}: {reason}")]
63 ActivityTrap {
64 reason: String,
65 trap_kind: TrapKind,
66 detail: Option<String>,
67 version: Version,
68 http_client_traces: Option<Vec<HttpClientTrace>>,
69 },
70 #[error("{reason_kind}")]
71 ActivityPreopenedDirError {
72 reason_kind: &'static str,
73 reason_inner: String,
74 version: Version,
75 },
76 #[error("activity returned error")]
78 ActivityReturnedError {
79 detail: Option<String>,
80 version: Version,
81 http_client_traces: Option<Vec<HttpClientTrace>>,
82 },
83 #[error("limit reached: {reason}")]
85 LimitReached { reason: String, version: Version },
86 #[error("temporary timeout")]
88 TemporaryTimeout {
89 http_client_traces: Option<Vec<HttpClientTrace>>,
90 version: Version,
91 },
92 #[error(transparent)]
93 DbError(DbError),
94 #[error("fatal error: {0}")]
96 FatalError(FatalError, Version),
97}
98
99#[derive(Debug, thiserror::Error)]
100pub enum FatalError {
101 #[error("nondeterminism detected")]
103 NondeterminismDetected { detail: String },
104 #[error(transparent)]
106 ParamsParsingError(ParamsParsingError),
107 #[error("cannot instantiate: {reason}")]
109 CannotInstantiate { reason: String, detail: String },
110 #[error(transparent)]
112 ResultParsingError(ResultParsingError),
113 #[error("error calling imported function {ffqn} : {reason}")]
115 ImportedFunctionCallError {
116 ffqn: FunctionFqn,
117 reason: StrVariant,
118 detail: Option<String>,
119 },
120 #[error("workflow {trap_kind}: {reason}")]
122 WorkflowTrap {
123 reason: String,
124 trap_kind: TrapKind,
125 detail: Option<String>,
126 },
127 #[error("out of fuel: {reason}")]
128 OutOfFuel { reason: String },
129}
130
131impl From<FatalError> for FinishedExecutionError {
132 fn from(value: FatalError) -> Self {
133 let reason_full = value.to_string();
134 match value {
135 FatalError::NondeterminismDetected { detail } => {
136 FinishedExecutionError::PermanentFailure {
137 reason_inner: reason_full.clone(),
138 reason_full,
139 kind: PermanentFailureKind::NondeterminismDetected,
140 detail: Some(detail),
141 }
142 }
143 FatalError::ParamsParsingError(params_parsing_error) => {
144 FinishedExecutionError::PermanentFailure {
145 reason_inner: reason_full.to_string(),
146 reason_full,
147 kind: PermanentFailureKind::ParamsParsingError,
148 detail: params_parsing_error.detail(),
149 }
150 }
151 FatalError::CannotInstantiate {
152 detail,
153 reason: reason_inner,
154 ..
155 } => FinishedExecutionError::PermanentFailure {
156 reason_inner,
157 reason_full,
158 kind: PermanentFailureKind::CannotInstantiate,
159 detail: Some(detail),
160 },
161 FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
162 reason_inner: reason_full.to_string(),
163 reason_full,
164 kind: PermanentFailureKind::ResultParsingError,
165 detail: None,
166 },
167 FatalError::ImportedFunctionCallError {
168 detail,
169 reason: reason_inner,
170 ..
171 } => FinishedExecutionError::PermanentFailure {
172 reason_inner: reason_inner.to_string(),
173 reason_full,
174 kind: PermanentFailureKind::ImportedFunctionCallError,
175 detail,
176 },
177 FatalError::WorkflowTrap {
178 detail,
179 reason: reason_inner,
180 ..
181 } => FinishedExecutionError::PermanentFailure {
182 reason_inner,
183 reason_full,
184 kind: PermanentFailureKind::WorkflowTrap,
185 detail,
186 },
187 FatalError::OutOfFuel {
188 reason: reason_inner,
189 } => FinishedExecutionError::PermanentFailure {
190 reason_inner,
191 reason_full,
192 kind: PermanentFailureKind::OutOfFuel,
193 detail: None,
194 },
195 }
196 }
197}