obeli_sk_executor/
worker.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::ExecutionIdDerived;
9use concepts::prefixed_ulid::RunId;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{
14    FinishedExecutionError, StrVariant,
15    storage::{DbError, JoinSetResponseEvent},
16};
17use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
18use concepts::{Params, SupportedFunctionReturnValue};
19use tracing::Span;
20
21#[async_trait]
22pub trait Worker: Send + Sync + 'static {
23    async fn run(&self, ctx: WorkerContext) -> WorkerResult;
24
25    // List exported functions without extensions.
26    // Used by executor.
27    // TODO: Rename to `exported_functions_noext`
28    fn exported_functions(&self) -> &[FunctionMetadata];
29}
30
31#[must_use]
32#[derive(Debug)]
33pub enum WorkerResult {
34    Ok(
35        SupportedFunctionReturnValue,
36        Version,
37        Option<Vec<HttpClientTrace>>,
38    ),
39    // If no write occured, the watcher will timeout the execution and retry after backoff which avoids the busy loop
40    DbUpdatedByWorkerOrWatcher,
41    Err(WorkerError),
42}
43
44#[derive(Debug)]
45pub struct WorkerContext {
46    pub execution_id: ExecutionId,
47    pub run_id: RunId,
48    pub metadata: ExecutionMetadata,
49    pub ffqn: FunctionFqn,
50    pub params: Params,
51    pub event_history: Vec<HistoryEvent>,
52    pub responses: Vec<JoinSetResponseEvent>,
53    pub version: Version,
54    pub execution_deadline: DateTime<Utc>,
55    pub can_be_retried: bool,
56    pub worker_span: Span,
57}
58
59#[derive(Debug, thiserror::Error)]
60pub enum WorkerError {
61    // retriable errors
62    // Used by activity worker
63    #[error("activity {trap_kind}: {reason}")]
64    ActivityTrap {
65        reason: String,
66        trap_kind: TrapKind,
67        detail: Option<String>,
68        version: Version,
69        http_client_traces: Option<Vec<HttpClientTrace>>,
70    },
71    #[error("{reason_kind}")]
72    ActivityPreopenedDirError {
73        reason_kind: &'static str,
74        reason_inner: String,
75        version: Version,
76    },
77    // Used by activity worker, must not be returned when retries are exhausted.
78    #[error("activity returned error")]
79    ActivityReturnedError {
80        detail: Option<String>,
81        version: Version,
82        http_client_traces: Option<Vec<HttpClientTrace>>,
83    },
84    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
85    #[error("limit reached: {reason}")]
86    LimitReached { reason: String, version: Version },
87    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
88    #[error("temporary timeout")]
89    TemporaryTimeout {
90        http_client_traces: Option<Vec<HttpClientTrace>>,
91        version: Version,
92    },
93    #[error(transparent)]
94    DbError(DbError),
95    // non-retriable errors
96    #[error("fatal error: {0}")]
97    FatalError(FatalError, Version),
98}
99
100#[derive(Debug, thiserror::Error)]
101pub enum FatalError {
102    /// Used by workflow worker when directly called child execution fails.
103    #[error("child finished with an execution error: {child_execution_id}")]
104    UnhandledChildExecutionError {
105        child_execution_id: ExecutionIdDerived,
106        root_cause_id: ExecutionIdDerived,
107    },
108    // Used by workflow worker
109    #[error("nondeterminism detected")]
110    NondeterminismDetected { detail: String },
111    // Used by activity worker, workflow worker
112    #[error(transparent)]
113    ParamsParsingError(ParamsParsingError),
114    // Used by activity worker, workflow worker
115    #[error("cannot instantiate: {reason}")]
116    CannotInstantiate { reason: String, detail: String },
117    // Used by activity worker, workflow worker
118    #[error(transparent)]
119    ResultParsingError(ResultParsingError),
120    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
121    #[error("error calling imported function {ffqn} : {reason}")]
122    ImportedFunctionCallError {
123        ffqn: FunctionFqn,
124        reason: StrVariant,
125        detail: Option<String>,
126    },
127    /// Workflow trap if `retry_on_trap` is disabled.
128    #[error("workflow {trap_kind}: {reason}")]
129    WorkflowTrap {
130        reason: String,
131        trap_kind: TrapKind,
132        detail: Option<String>,
133    },
134    #[error("out of fuel: {reason}")]
135    OutOfFuel { reason: String },
136}
137
138impl From<FatalError> for FinishedExecutionError {
139    fn from(value: FatalError) -> Self {
140        let reason_full = value.to_string();
141        match value {
142            FatalError::UnhandledChildExecutionError {
143                child_execution_id,
144                root_cause_id,
145            } => FinishedExecutionError::UnhandledChildExecutionError {
146                child_execution_id,
147                root_cause_id,
148            },
149            FatalError::NondeterminismDetected { detail } => {
150                FinishedExecutionError::PermanentFailure {
151                    reason_inner: reason_full.clone(),
152                    reason_full,
153                    kind: PermanentFailureKind::NondeterminismDetected,
154                    detail: Some(detail),
155                }
156            }
157            FatalError::ParamsParsingError(params_parsing_error) => {
158                FinishedExecutionError::PermanentFailure {
159                    reason_inner: reason_full.to_string(),
160                    reason_full,
161                    kind: PermanentFailureKind::ParamsParsingError,
162                    detail: params_parsing_error.detail(),
163                }
164            }
165            FatalError::CannotInstantiate {
166                detail,
167                reason: reason_inner,
168                ..
169            } => FinishedExecutionError::PermanentFailure {
170                reason_inner,
171                reason_full,
172                kind: PermanentFailureKind::CannotInstantiate,
173                detail: Some(detail),
174            },
175            FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
176                reason_inner: reason_full.to_string(),
177                reason_full,
178                kind: PermanentFailureKind::ResultParsingError,
179                detail: None,
180            },
181            FatalError::ImportedFunctionCallError {
182                detail,
183                reason: reason_inner,
184                ..
185            } => FinishedExecutionError::PermanentFailure {
186                reason_inner: reason_inner.to_string(),
187                reason_full,
188                kind: PermanentFailureKind::ImportedFunctionCallError,
189                detail,
190            },
191            FatalError::WorkflowTrap {
192                detail,
193                reason: reason_inner,
194                ..
195            } => FinishedExecutionError::PermanentFailure {
196                reason_inner,
197                reason_full,
198                kind: PermanentFailureKind::WorkflowTrap,
199                detail,
200            },
201            FatalError::OutOfFuel {
202                reason: reason_inner,
203            } => FinishedExecutionError::PermanentFailure {
204                reason_inner,
205                reason_full,
206                kind: PermanentFailureKind::OutOfFuel,
207                detail: None,
208            },
209        }
210    }
211}