1use async_trait::async_trait;
2use concepts::ExecutionFailureKind;
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::TrapKind;
7use concepts::storage::DbErrorWrite;
8use concepts::storage::HistoryEvent;
9use concepts::storage::Locked;
10use concepts::storage::Version;
11use concepts::storage::http_client_trace::HttpClientTrace;
12use concepts::{FinishedExecutionError, StrVariant, storage::JoinSetResponseEvent};
13use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
14use concepts::{Params, SupportedFunctionReturnValue};
15use tracing::Span;
16
17#[async_trait]
18pub trait Worker: Send + Sync + 'static {
19 async fn run(&self, ctx: WorkerContext) -> WorkerResult;
20
21 fn exported_functions(&self) -> &[FunctionMetadata];
25}
26
27#[must_use]
28#[derive(Debug)]
29pub enum WorkerResult {
30 Ok(
31 SupportedFunctionReturnValue,
32 Version,
33 Option<Vec<HttpClientTrace>>,
34 ),
35 DbUpdatedByWorkerOrWatcher,
37 Err(WorkerError),
38}
39
40#[derive(Debug)]
41pub struct WorkerContext {
42 pub execution_id: ExecutionId,
43 pub metadata: ExecutionMetadata,
44 pub ffqn: FunctionFqn,
45 pub params: Params,
46 pub event_history: Vec<(HistoryEvent, Version)>,
47 pub responses: Vec<JoinSetResponseEvent>,
48 pub version: Version,
49 pub can_be_retried: bool,
50 pub worker_span: Span,
51 pub locked_event: Locked,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum WorkerError {
56 #[error("activity {trap_kind}: {reason}")]
59 ActivityTrap {
60 reason: String,
61 trap_kind: TrapKind,
62 detail: Option<String>,
63 version: Version,
64 http_client_traces: Option<Vec<HttpClientTrace>>,
65 },
66 #[error("{reason}")]
67 ActivityPreopenedDirError {
68 reason: String,
69 detail: String,
70 version: Version,
71 },
72 #[error("activity returned error")]
74 ActivityReturnedError {
75 detail: Option<String>,
76 version: Version,
77 http_client_traces: Option<Vec<HttpClientTrace>>,
78 },
79 #[error("limit reached: {reason}")]
81 LimitReached { reason: String, version: Version },
82 #[error("temporary timeout")]
84 TemporaryTimeout {
85 http_client_traces: Option<Vec<HttpClientTrace>>,
86 version: Version,
87 },
88 #[error(transparent)]
89 DbError(DbErrorWrite),
90 #[error("fatal error: {0}")]
92 FatalError(FatalError, Version),
93}
94
95#[derive(Debug, thiserror::Error)]
96pub enum FatalError {
97 #[error("nondeterminism detected")]
99 NondeterminismDetected { detail: String },
100 #[error(transparent)]
102 ParamsParsingError(ParamsParsingError),
103 #[error("cannot instantiate: {reason}")]
105 CannotInstantiate { reason: String, detail: String },
106 #[error(transparent)]
108 ResultParsingError(ResultParsingError),
109 #[error("error calling imported function {ffqn} : {reason}")]
111 ImportedFunctionCallError {
112 ffqn: FunctionFqn,
113 reason: StrVariant,
114 detail: Option<String>,
115 },
116 #[error("workflow {trap_kind}: {reason}")]
118 WorkflowTrap {
119 reason: String,
120 trap_kind: TrapKind,
121 detail: Option<String>,
122 },
123 #[error("out of fuel: {reason}")]
124 OutOfFuel { reason: String },
125 #[error("constraint violation: {reason}")]
126 ConstraintViolation { reason: StrVariant },
127}
128
129impl From<FatalError> for FinishedExecutionError {
130 fn from(err: FatalError) -> Self {
131 let reason_generic = err.to_string(); match err {
133 FatalError::NondeterminismDetected { detail } => FinishedExecutionError {
134 reason: None,
135 kind: ExecutionFailureKind::NondeterminismDetected,
136 detail: Some(detail),
137 },
138 FatalError::OutOfFuel { reason } => FinishedExecutionError {
139 reason: Some(reason),
140 kind: ExecutionFailureKind::OutOfFuel,
141 detail: None,
142 },
143 FatalError::ParamsParsingError(err) => FinishedExecutionError {
144 reason: Some(reason_generic),
145 kind: ExecutionFailureKind::Uncategorized,
146 detail: err.detail(),
147 },
148 FatalError::CannotInstantiate { reason: _, detail } => FinishedExecutionError {
149 reason: Some(reason_generic),
150 kind: ExecutionFailureKind::Uncategorized,
151 detail: Some(detail),
152 },
153 FatalError::ResultParsingError(_) | FatalError::ConstraintViolation { reason: _ } => {
154 FinishedExecutionError {
155 reason: Some(reason_generic),
156 kind: ExecutionFailureKind::Uncategorized,
157 detail: None,
158 }
159 }
160 FatalError::ImportedFunctionCallError { detail, .. }
161 | FatalError::WorkflowTrap { detail, .. } => FinishedExecutionError {
162 reason: Some(reason_generic),
163 kind: ExecutionFailureKind::Uncategorized,
164 detail,
165 },
166 }
167 }
168}