obeli_sk_executor/
worker.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::RunId;
9use concepts::storage::HistoryEvent;
10use concepts::storage::Version;
11use concepts::storage::http_client_trace::HttpClientTrace;
12use concepts::{
13    FinishedExecutionError, StrVariant,
14    storage::{DbError, JoinSetResponseEvent},
15};
16use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
17use concepts::{Params, SupportedFunctionReturnValue};
18use tracing::Span;
19
20#[async_trait]
21pub trait Worker: Send + Sync + 'static {
22    async fn run(&self, ctx: WorkerContext) -> WorkerResult;
23
24    // List exported functions without extensions.
25    // Used by executor.
26    // TODO: Rename to `exported_functions_noext`
27    fn exported_functions(&self) -> &[FunctionMetadata];
28}
29
30#[must_use]
31#[derive(Debug)]
32pub enum WorkerResult {
33    Ok(
34        SupportedFunctionReturnValue,
35        Version,
36        Option<Vec<HttpClientTrace>>,
37    ),
38    // If no write occured, the watcher will timeout the execution and retry after backoff which avoids the busy loop
39    DbUpdatedByWorkerOrWatcher,
40    Err(WorkerError),
41}
42
43#[derive(Debug)]
44pub struct WorkerContext {
45    pub execution_id: ExecutionId,
46    pub run_id: RunId,
47    pub metadata: ExecutionMetadata,
48    pub ffqn: FunctionFqn,
49    pub params: Params,
50    pub event_history: Vec<HistoryEvent>,
51    pub responses: Vec<JoinSetResponseEvent>,
52    pub version: Version,
53    pub execution_deadline: DateTime<Utc>,
54    pub can_be_retried: bool,
55    pub worker_span: Span,
56}
57
58#[derive(Debug, thiserror::Error)]
59pub enum WorkerError {
60    // retriable errors
61    // Used by activity worker
62    #[error("activity {trap_kind}: {reason}")]
63    ActivityTrap {
64        reason: String,
65        trap_kind: TrapKind,
66        detail: Option<String>,
67        version: Version,
68        http_client_traces: Option<Vec<HttpClientTrace>>,
69    },
70    #[error("{reason_kind}")]
71    ActivityPreopenedDirError {
72        reason_kind: &'static str,
73        reason_inner: String,
74        version: Version,
75    },
76    // Used by activity worker, must not be returned when retries are exhausted.
77    #[error("activity returned error")]
78    ActivityReturnedError {
79        detail: Option<String>,
80        version: Version,
81        http_client_traces: Option<Vec<HttpClientTrace>>,
82    },
83    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
84    #[error("limit reached: {reason}")]
85    LimitReached { reason: String, version: Version },
86    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
87    #[error("temporary timeout")]
88    TemporaryTimeout {
89        http_client_traces: Option<Vec<HttpClientTrace>>,
90        version: Version,
91    },
92    #[error(transparent)]
93    DbError(DbError),
94    // non-retriable errors
95    #[error("fatal error: {0}")]
96    FatalError(FatalError, Version),
97}
98
99#[derive(Debug, thiserror::Error)]
100pub enum FatalError {
101    // Used by workflow worker
102    #[error("nondeterminism detected")]
103    NondeterminismDetected { detail: String },
104    // Used by activity worker, workflow worker
105    #[error(transparent)]
106    ParamsParsingError(ParamsParsingError),
107    // Used by activity worker, workflow worker
108    #[error("cannot instantiate: {reason}")]
109    CannotInstantiate { reason: String, detail: String },
110    // Used by activity worker, workflow worker
111    #[error(transparent)]
112    ResultParsingError(ResultParsingError),
113    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
114    #[error("error calling imported function {ffqn} : {reason}")]
115    ImportedFunctionCallError {
116        ffqn: FunctionFqn,
117        reason: StrVariant,
118        detail: Option<String>,
119    },
120    /// Workflow trap if `retry_on_trap` is disabled.
121    #[error("workflow {trap_kind}: {reason}")]
122    WorkflowTrap {
123        reason: String,
124        trap_kind: TrapKind,
125        detail: Option<String>,
126    },
127    #[error("out of fuel: {reason}")]
128    OutOfFuel { reason: String },
129}
130
131impl From<FatalError> for FinishedExecutionError {
132    fn from(value: FatalError) -> Self {
133        let reason_full = value.to_string();
134        match value {
135            FatalError::NondeterminismDetected { detail } => {
136                FinishedExecutionError::PermanentFailure {
137                    reason_inner: reason_full.clone(),
138                    reason_full,
139                    kind: PermanentFailureKind::NondeterminismDetected,
140                    detail: Some(detail),
141                }
142            }
143            FatalError::ParamsParsingError(params_parsing_error) => {
144                FinishedExecutionError::PermanentFailure {
145                    reason_inner: reason_full.to_string(),
146                    reason_full,
147                    kind: PermanentFailureKind::ParamsParsingError,
148                    detail: params_parsing_error.detail(),
149                }
150            }
151            FatalError::CannotInstantiate {
152                detail,
153                reason: reason_inner,
154                ..
155            } => FinishedExecutionError::PermanentFailure {
156                reason_inner,
157                reason_full,
158                kind: PermanentFailureKind::CannotInstantiate,
159                detail: Some(detail),
160            },
161            FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
162                reason_inner: reason_full.to_string(),
163                reason_full,
164                kind: PermanentFailureKind::ResultParsingError,
165                detail: None,
166            },
167            FatalError::ImportedFunctionCallError {
168                detail,
169                reason: reason_inner,
170                ..
171            } => FinishedExecutionError::PermanentFailure {
172                reason_inner: reason_inner.to_string(),
173                reason_full,
174                kind: PermanentFailureKind::ImportedFunctionCallError,
175                detail,
176            },
177            FatalError::WorkflowTrap {
178                detail,
179                reason: reason_inner,
180                ..
181            } => FinishedExecutionError::PermanentFailure {
182                reason_inner,
183                reason_full,
184                kind: PermanentFailureKind::WorkflowTrap,
185                detail,
186            },
187            FatalError::OutOfFuel {
188                reason: reason_inner,
189            } => FinishedExecutionError::PermanentFailure {
190                reason_inner,
191                reason_full,
192                kind: PermanentFailureKind::OutOfFuel,
193                detail: None,
194            },
195        }
196    }
197}