1use async_trait::async_trait;
2use concepts::ExecutionFailureKind;
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::TrapKind;
7use concepts::storage::DbErrorWrite;
8use concepts::storage::HistoryEvent;
9use concepts::storage::Locked;
10use concepts::storage::Version;
11use concepts::storage::http_client_trace::HttpClientTrace;
12use concepts::{FinishedExecutionError, StrVariant, storage::JoinSetResponseEvent};
13use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
14use concepts::{Params, SupportedFunctionReturnValue};
15use tracing::Span;
16
17#[async_trait]
18pub trait Worker: Send + Sync + 'static {
19 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
20
21 fn exported_functions(&self) -> &[FunctionMetadata];
25}
26
27#[must_use]
28#[derive(Debug)]
29pub enum WorkerResult {
30 Ok(
31 SupportedFunctionReturnValue,
32 Version,
33 Option<Vec<HttpClientTrace>>,
34 ),
35 DbUpdatedByWorkerOrWatcher,
37 Err(WorkerError),
38}
39
40#[derive(Debug)]
41pub struct WorkerContext {
42 pub execution_id: ExecutionId,
43 pub metadata: ExecutionMetadata,
44 pub ffqn: FunctionFqn,
45 pub params: Params,
46 pub event_history: Vec<(HistoryEvent, Version)>,
47 pub responses: Vec<JoinSetResponseEvent>,
48 pub version: Version,
49 pub can_be_retried: bool,
50 pub worker_span: Span,
51 pub locked_event: Locked,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum WorkerError {
56 #[error("activity {trap_kind}: {reason}")]
59 ActivityTrap {
60 reason: String,
61 trap_kind: TrapKind,
62 detail: Option<String>,
63 version: Version,
64 http_client_traces: Option<Vec<HttpClientTrace>>,
65 },
66 #[error("{reason}")]
67 ActivityPreopenedDirError {
68 reason: String,
69 detail: String,
70 version: Version,
71 },
72 #[error("activity returned error")]
74 ActivityReturnedError {
75 detail: Option<String>,
76 version: Version,
77 http_client_traces: Option<Vec<HttpClientTrace>>,
78 },
79 #[error("limit reached: {reason}")]
81 LimitReached { reason: String, version: Version },
82 #[error("temporary timeout")]
84 TemporaryTimeout {
85 http_client_traces: Option<Vec<HttpClientTrace>>,
86 version: Version,
87 },
88 #[error(transparent)]
89 DbError(DbErrorWrite),
90 #[error("fatal error: {0}")]
92 FatalError(FatalError, Version),
93}
94
95#[derive(Debug, thiserror::Error)]
96pub enum FatalError {
97 #[error("nondeterminism detected")]
99 NondeterminismDetected { detail: String },
100 #[error(transparent)]
102 ParamsParsingError(ParamsParsingError),
103 #[error("cannot instantiate: {reason}")]
105 CannotInstantiate { reason: String, detail: String },
106 #[error(transparent)]
108 ResultParsingError(ResultParsingError),
109 #[error("error calling imported function {ffqn} : {reason}")]
111 ImportedFunctionCallError {
112 ffqn: FunctionFqn,
113 reason: StrVariant,
114 detail: Option<String>,
115 },
116 #[error("workflow {trap_kind}: {reason}")]
118 WorkflowTrap {
119 reason: String,
120 trap_kind: TrapKind,
121 detail: Option<String>,
122 },
123 #[error("out of fuel: {reason}")]
124 OutOfFuel { reason: String },
125 #[error("constraint violation: {reason}")]
126 ConstraintViolation { reason: StrVariant },
127
128 #[error("cancelled")]
129 Cancelled,
130}
131
132impl From<FatalError> for FinishedExecutionError {
133 fn from(err: FatalError) -> Self {
134 let reason_generic = err.to_string(); match err {
136 FatalError::NondeterminismDetected { detail } => FinishedExecutionError {
137 reason: None,
138 kind: ExecutionFailureKind::NondeterminismDetected,
139 detail: Some(detail),
140 },
141 FatalError::OutOfFuel { reason } => FinishedExecutionError {
142 reason: Some(reason),
143 kind: ExecutionFailureKind::OutOfFuel,
144 detail: None,
145 },
146 FatalError::ParamsParsingError(err) => FinishedExecutionError {
147 reason: Some(reason_generic),
148 kind: ExecutionFailureKind::Uncategorized,
149 detail: err.detail(),
150 },
151 FatalError::CannotInstantiate { reason: _, detail } => FinishedExecutionError {
152 reason: Some(reason_generic),
153 kind: ExecutionFailureKind::Uncategorized,
154 detail: Some(detail),
155 },
156 FatalError::ResultParsingError(_) | FatalError::ConstraintViolation { reason: _ } => {
157 FinishedExecutionError {
158 reason: Some(reason_generic),
159 kind: ExecutionFailureKind::Uncategorized,
160 detail: None,
161 }
162 }
163 FatalError::ImportedFunctionCallError { detail, .. }
164 | FatalError::WorkflowTrap { detail, .. } => FinishedExecutionError {
165 reason: Some(reason_generic),
166 kind: ExecutionFailureKind::Uncategorized,
167 detail,
168 },
169 FatalError::Cancelled => FinishedExecutionError {
170 kind: ExecutionFailureKind::Cancelled,
171 reason: None,
172 detail: None,
173 },
174 }
175 }
176}