obeli_sk_executor/
worker.rs

1use async_trait::async_trait;
2use concepts::ExecutionId;
3use concepts::ExecutionMetadata;
4use concepts::FunctionMetadata;
5use concepts::PermanentFailureKind;
6use concepts::TrapKind;
7use concepts::storage::DbErrorWrite;
8use concepts::storage::HistoryEvent;
9use concepts::storage::Locked;
10use concepts::storage::Version;
11use concepts::storage::http_client_trace::HttpClientTrace;
12use concepts::{FinishedExecutionError, StrVariant, storage::JoinSetResponseEvent};
13use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
14use concepts::{Params, SupportedFunctionReturnValue};
15use tracing::Span;
16
17#[async_trait]
18pub trait Worker: Send + Sync + 'static {
19    async fn run(&self, ctx: WorkerContext) -> WorkerResult;
20
21    // List exported functions without extensions.
22    // Used by executor.
23    // TODO: Rename to `exported_functions_noext`
24    fn exported_functions(&self) -> &[FunctionMetadata];
25}
26
27#[must_use]
28#[derive(Debug)]
29pub enum WorkerResult {
30    Ok(
31        SupportedFunctionReturnValue,
32        Version,
33        Option<Vec<HttpClientTrace>>,
34    ),
35    // If no write occured, the watcher will timeout the execution and retry after backoff which avoids the busy loop
36    DbUpdatedByWorkerOrWatcher,
37    Err(WorkerError),
38}
39
40#[derive(Debug)]
41pub struct WorkerContext {
42    pub execution_id: ExecutionId,
43    pub metadata: ExecutionMetadata,
44    pub ffqn: FunctionFqn,
45    pub params: Params,
46    pub event_history: Vec<HistoryEvent>,
47    pub responses: Vec<JoinSetResponseEvent>,
48    pub version: Version,
49    pub can_be_retried: bool,
50    pub worker_span: Span,
51    pub locked_event: Locked,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum WorkerError {
56    // retriable errors
57    // Used by activity worker
58    #[error("activity {trap_kind}: {reason}")]
59    ActivityTrap {
60        reason: String,
61        trap_kind: TrapKind,
62        detail: Option<String>,
63        version: Version,
64        http_client_traces: Option<Vec<HttpClientTrace>>,
65    },
66    #[error("{reason_kind}")]
67    ActivityPreopenedDirError {
68        reason_kind: &'static str,
69        reason_inner: String,
70        version: Version,
71    },
72    // Used by activity worker, must not be returned when retries are exhausted.
73    #[error("activity returned error")]
74    ActivityReturnedError {
75        detail: Option<String>,
76        version: Version,
77        http_client_traces: Option<Vec<HttpClientTrace>>,
78    },
79    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
80    #[error("limit reached: {reason}")]
81    LimitReached { reason: String, version: Version },
82    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
83    #[error("temporary timeout")]
84    TemporaryTimeout {
85        http_client_traces: Option<Vec<HttpClientTrace>>,
86        version: Version,
87    },
88    #[error(transparent)]
89    DbError(DbErrorWrite),
90    // non-retriable errors
91    #[error("fatal error: {0}")]
92    FatalError(FatalError, Version),
93}
94
95#[derive(Debug, thiserror::Error)]
96pub enum FatalError {
97    // Used by workflow worker
98    #[error("nondeterminism detected")]
99    NondeterminismDetected { detail: String },
100    // Used by activity worker, workflow worker
101    #[error(transparent)]
102    ParamsParsingError(ParamsParsingError),
103    // Used by activity worker, workflow worker
104    #[error("cannot instantiate: {reason}")]
105    CannotInstantiate { reason: String, detail: String },
106    // Used by activity worker, workflow worker
107    #[error(transparent)]
108    ResultParsingError(ResultParsingError),
109    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
110    #[error("error calling imported function {ffqn} : {reason}")]
111    ImportedFunctionCallError {
112        ffqn: FunctionFqn,
113        reason: StrVariant,
114        detail: Option<String>,
115    },
116    /// Workflow trap if `retry_on_trap` is disabled.
117    #[error("workflow {trap_kind}: {reason}")]
118    WorkflowTrap {
119        reason: String,
120        trap_kind: TrapKind,
121        detail: Option<String>,
122    },
123    #[error("out of fuel: {reason}")]
124    OutOfFuel { reason: String },
125}
126
127impl From<FatalError> for FinishedExecutionError {
128    fn from(value: FatalError) -> Self {
129        let reason_full = value.to_string();
130        match value {
131            FatalError::NondeterminismDetected { detail } => {
132                FinishedExecutionError::PermanentFailure {
133                    reason_inner: reason_full.clone(),
134                    reason_full,
135                    kind: PermanentFailureKind::NondeterminismDetected,
136                    detail: Some(detail),
137                }
138            }
139            FatalError::ParamsParsingError(params_parsing_error) => {
140                FinishedExecutionError::PermanentFailure {
141                    reason_inner: reason_full.clone(),
142                    reason_full,
143                    kind: PermanentFailureKind::ParamsParsingError,
144                    detail: params_parsing_error.detail(),
145                }
146            }
147            FatalError::CannotInstantiate {
148                detail,
149                reason: reason_inner,
150                ..
151            } => FinishedExecutionError::PermanentFailure {
152                reason_inner,
153                reason_full,
154                kind: PermanentFailureKind::CannotInstantiate,
155                detail: Some(detail),
156            },
157            FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
158                reason_inner: reason_full.clone(),
159                reason_full,
160                kind: PermanentFailureKind::ResultParsingError,
161                detail: None,
162            },
163            FatalError::ImportedFunctionCallError {
164                detail,
165                reason: reason_inner,
166                ..
167            } => FinishedExecutionError::PermanentFailure {
168                reason_inner: reason_inner.to_string(),
169                reason_full,
170                kind: PermanentFailureKind::ImportedFunctionCallError,
171                detail,
172            },
173            FatalError::WorkflowTrap {
174                detail,
175                reason: reason_inner,
176                ..
177            } => FinishedExecutionError::PermanentFailure {
178                reason_inner,
179                reason_full,
180                kind: PermanentFailureKind::WorkflowTrap,
181                detail,
182            },
183            FatalError::OutOfFuel {
184                reason: reason_inner,
185            } => FinishedExecutionError::PermanentFailure {
186                reason_inner,
187                reason_full,
188                kind: PermanentFailureKind::OutOfFuel,
189                detail: None,
190            },
191        }
192    }
193}