obeli_sk_executor/
worker.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::RunId;
9use concepts::storage::DbErrorWrite;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{FinishedExecutionError, StrVariant, storage::JoinSetResponseEvent};
14use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
15use concepts::{Params, SupportedFunctionReturnValue};
16use tracing::Span;
17
18#[async_trait]
19pub trait Worker: Send + Sync + 'static {
20    async fn run(&self, ctx: WorkerContext) -> WorkerResult;
21
22    // List exported functions without extensions.
23    // Used by executor.
24    // TODO: Rename to `exported_functions_noext`
25    fn exported_functions(&self) -> &[FunctionMetadata];
26}
27
28#[must_use]
29#[derive(Debug)]
30pub enum WorkerResult {
31    Ok(
32        SupportedFunctionReturnValue,
33        Version,
34        Option<Vec<HttpClientTrace>>,
35    ),
36    // If no write occured, the watcher will timeout the execution and retry after backoff which avoids the busy loop
37    DbUpdatedByWorkerOrWatcher,
38    Err(WorkerError),
39}
40
41#[derive(Debug)]
42pub struct WorkerContext {
43    pub execution_id: ExecutionId,
44    pub run_id: RunId,
45    pub metadata: ExecutionMetadata,
46    pub ffqn: FunctionFqn,
47    pub params: Params,
48    pub event_history: Vec<HistoryEvent>,
49    pub responses: Vec<JoinSetResponseEvent>,
50    pub version: Version,
51    pub execution_deadline: DateTime<Utc>,
52    pub can_be_retried: bool,
53    pub worker_span: Span,
54}
55
56#[derive(Debug, thiserror::Error)]
57pub enum WorkerError {
58    // retriable errors
59    // Used by activity worker
60    #[error("activity {trap_kind}: {reason}")]
61    ActivityTrap {
62        reason: String,
63        trap_kind: TrapKind,
64        detail: Option<String>,
65        version: Version,
66        http_client_traces: Option<Vec<HttpClientTrace>>,
67    },
68    #[error("{reason_kind}")]
69    ActivityPreopenedDirError {
70        reason_kind: &'static str,
71        reason_inner: String,
72        version: Version,
73    },
74    // Used by activity worker, must not be returned when retries are exhausted.
75    #[error("activity returned error")]
76    ActivityReturnedError {
77        detail: Option<String>,
78        version: Version,
79        http_client_traces: Option<Vec<HttpClientTrace>>,
80    },
81    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
82    #[error("limit reached: {reason}")]
83    LimitReached { reason: String, version: Version },
84    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
85    #[error("temporary timeout")]
86    TemporaryTimeout {
87        http_client_traces: Option<Vec<HttpClientTrace>>,
88        version: Version,
89    },
90    #[error(transparent)]
91    DbError(DbErrorWrite),
92    // non-retriable errors
93    #[error("fatal error: {0}")]
94    FatalError(FatalError, Version),
95}
96
97#[derive(Debug, thiserror::Error)]
98pub enum FatalError {
99    // Used by workflow worker
100    #[error("nondeterminism detected")]
101    NondeterminismDetected { detail: String },
102    // Used by activity worker, workflow worker
103    #[error(transparent)]
104    ParamsParsingError(ParamsParsingError),
105    // Used by activity worker, workflow worker
106    #[error("cannot instantiate: {reason}")]
107    CannotInstantiate { reason: String, detail: String },
108    // Used by activity worker, workflow worker
109    #[error(transparent)]
110    ResultParsingError(ResultParsingError),
111    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
112    #[error("error calling imported function {ffqn} : {reason}")]
113    ImportedFunctionCallError {
114        ffqn: FunctionFqn,
115        reason: StrVariant,
116        detail: Option<String>,
117    },
118    /// Workflow trap if `retry_on_trap` is disabled.
119    #[error("workflow {trap_kind}: {reason}")]
120    WorkflowTrap {
121        reason: String,
122        trap_kind: TrapKind,
123        detail: Option<String>,
124    },
125    #[error("out of fuel: {reason}")]
126    OutOfFuel { reason: String },
127}
128
129impl From<FatalError> for FinishedExecutionError {
130    fn from(value: FatalError) -> Self {
131        let reason_full = value.to_string();
132        match value {
133            FatalError::NondeterminismDetected { detail } => {
134                FinishedExecutionError::PermanentFailure {
135                    reason_inner: reason_full.clone(),
136                    reason_full,
137                    kind: PermanentFailureKind::NondeterminismDetected,
138                    detail: Some(detail),
139                }
140            }
141            FatalError::ParamsParsingError(params_parsing_error) => {
142                FinishedExecutionError::PermanentFailure {
143                    reason_inner: reason_full.clone(),
144                    reason_full,
145                    kind: PermanentFailureKind::ParamsParsingError,
146                    detail: params_parsing_error.detail(),
147                }
148            }
149            FatalError::CannotInstantiate {
150                detail,
151                reason: reason_inner,
152                ..
153            } => FinishedExecutionError::PermanentFailure {
154                reason_inner,
155                reason_full,
156                kind: PermanentFailureKind::CannotInstantiate,
157                detail: Some(detail),
158            },
159            FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
160                reason_inner: reason_full.clone(),
161                reason_full,
162                kind: PermanentFailureKind::ResultParsingError,
163                detail: None,
164            },
165            FatalError::ImportedFunctionCallError {
166                detail,
167                reason: reason_inner,
168                ..
169            } => FinishedExecutionError::PermanentFailure {
170                reason_inner: reason_inner.to_string(),
171                reason_full,
172                kind: PermanentFailureKind::ImportedFunctionCallError,
173                detail,
174            },
175            FatalError::WorkflowTrap {
176                detail,
177                reason: reason_inner,
178                ..
179            } => FinishedExecutionError::PermanentFailure {
180                reason_inner,
181                reason_full,
182                kind: PermanentFailureKind::WorkflowTrap,
183                detail,
184            },
185            FatalError::OutOfFuel {
186                reason: reason_inner,
187            } => FinishedExecutionError::PermanentFailure {
188                reason_inner,
189                reason_full,
190                kind: PermanentFailureKind::OutOfFuel,
191                detail: None,
192            },
193        }
194    }
195}