obeli_sk_executor/
worker.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::ExecutionIdDerived;
9use concepts::prefixed_ulid::RunId;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{
14    FinishedExecutionError, StrVariant,
15    storage::{DbError, JoinSetResponseEvent},
16};
17use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
18use concepts::{Params, SupportedFunctionReturnValue};
19use tracing::Span;
20
21#[async_trait]
22pub trait Worker: Send + Sync + 'static {
23    async fn run(&self, ctx: WorkerContext) -> WorkerResult;
24
25    // List exported functions without extensions.
26    // Used by executor.
27    // TODO: Rename to `exported_functions_noext`
28    fn exported_functions(&self) -> &[FunctionMetadata];
29}
30
31#[must_use]
32#[derive(Debug)]
33pub enum WorkerResult {
34    Ok(
35        SupportedFunctionReturnValue,
36        Version,
37        Option<Vec<HttpClientTrace>>,
38    ),
39    DbUpdatedByWorkerOrWatcher,
40    Err(WorkerError),
41}
42
43#[derive(Debug)]
44pub struct WorkerContext {
45    pub execution_id: ExecutionId,
46    pub run_id: RunId,
47    pub metadata: ExecutionMetadata,
48    pub ffqn: FunctionFqn,
49    pub params: Params,
50    pub event_history: Vec<HistoryEvent>,
51    pub responses: Vec<JoinSetResponseEvent>,
52    pub version: Version,
53    pub execution_deadline: DateTime<Utc>,
54    pub can_be_retried: bool,
55    pub worker_span: Span,
56}
57
58#[derive(Debug, thiserror::Error)]
59pub enum WorkerError {
60    // retriable errors
61    // Used by activity worker
62    #[error("activity {trap_kind}: {reason}")]
63    ActivityTrap {
64        reason: String,
65        trap_kind: TrapKind,
66        detail: String,
67        version: Version,
68        http_client_traces: Option<Vec<HttpClientTrace>>,
69    },
70    #[error("{reason_kind}")]
71    ActivityPreopenedDirError {
72        reason_kind: &'static str,
73        reason_inner: String,
74        version: Version,
75    },
76    // Used by activity worker, must not be returned when retries are exhausted.
77    #[error("activity returned error")]
78    ActivityReturnedError {
79        detail: Option<String>,
80        version: Version,
81        http_client_traces: Option<Vec<HttpClientTrace>>,
82    },
83    /// Workflow trap when `retry_on_trap` is enabled.
84    #[error("workflow trap handled as temporary error: {reason}")]
85    TemporaryWorkflowTrap {
86        reason: String,
87        kind: TrapKind,
88        detail: Option<String>,
89        version: Version,
90    },
91    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
92    #[error("limit reached: {reason}")]
93    LimitReached { reason: String, version: Version },
94    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
95    #[error("temporary timeout")]
96    TemporaryTimeout {
97        http_client_traces: Option<Vec<HttpClientTrace>>,
98        version: Version,
99    },
100    #[error(transparent)]
101    DbError(DbError),
102    // non-retriable errors
103    #[error("fatal error: {0}")]
104    FatalError(FatalError, Version),
105}
106
107#[derive(Debug, thiserror::Error)]
108pub enum FatalError {
109    /// Used by workflow worker when directly called child execution fails.
110    #[error("child finished with an execution error: {child_execution_id}")]
111    UnhandledChildExecutionError {
112        child_execution_id: ExecutionIdDerived,
113        root_cause_id: ExecutionIdDerived,
114    },
115
116    // Used by workflow worker
117    #[error("nondeterminism detected")]
118    NondeterminismDetected { detail: String },
119    // Used by activity worker, workflow worker
120    #[error(transparent)]
121    ParamsParsingError(ParamsParsingError),
122    // Used by activity worker, workflow worker
123    #[error("cannot instantiate: {reason}")]
124    CannotInstantiate { reason: String, detail: String },
125    // Used by activity worker, workflow worker
126    #[error(transparent)]
127    ResultParsingError(ResultParsingError),
128    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
129    #[error("error calling imported function {ffqn} : {reason}")]
130    ImportedFunctionCallError {
131        ffqn: FunctionFqn,
132        reason: StrVariant,
133        detail: Option<String>,
134    },
135
136    /// Workflow trap if `retry_on_trap` is disabled.
137    #[error("workflow {trap_kind}: {reason}")]
138    WorkflowTrap {
139        reason: String,
140        trap_kind: TrapKind,
141        detail: String,
142    },
143    /// Workflow attempted to create a join set with the same name twice.
144    #[error("join set already exists with name `{name}`")]
145    JoinSetNameConflict { name: String },
146}
147
148impl From<FatalError> for FinishedExecutionError {
149    fn from(value: FatalError) -> Self {
150        let reason_full = value.to_string();
151        match value {
152            FatalError::UnhandledChildExecutionError {
153                child_execution_id,
154                root_cause_id,
155            } => FinishedExecutionError::UnhandledChildExecutionError {
156                child_execution_id,
157                root_cause_id,
158            },
159            FatalError::NondeterminismDetected { detail } => {
160                FinishedExecutionError::PermanentFailure {
161                    reason_inner: reason_full.clone(),
162                    reason_full,
163                    kind: PermanentFailureKind::NondeterminismDetected,
164                    detail: Some(detail),
165                }
166            }
167            FatalError::ParamsParsingError(params_parsing_error) => {
168                FinishedExecutionError::PermanentFailure {
169                    reason_inner: reason_full.to_string(),
170                    reason_full,
171                    kind: PermanentFailureKind::ParamsParsingError,
172                    detail: params_parsing_error.detail(),
173                }
174            }
175            FatalError::CannotInstantiate {
176                detail,
177                reason: reason_inner,
178                ..
179            } => FinishedExecutionError::PermanentFailure {
180                reason_inner,
181                reason_full,
182                kind: PermanentFailureKind::CannotInstantiate,
183                detail: Some(detail),
184            },
185            FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
186                reason_inner: reason_full.to_string(),
187                reason_full,
188                kind: PermanentFailureKind::ResultParsingError,
189                detail: None,
190            },
191            FatalError::ImportedFunctionCallError {
192                detail,
193                reason: reason_inner,
194                ..
195            } => FinishedExecutionError::PermanentFailure {
196                reason_inner: reason_inner.to_string(),
197                reason_full,
198                kind: PermanentFailureKind::ImportedFunctionCallError,
199                detail,
200            },
201            FatalError::WorkflowTrap {
202                detail,
203                reason: reason_inner,
204                ..
205            } => FinishedExecutionError::PermanentFailure {
206                reason_inner,
207                reason_full,
208                kind: PermanentFailureKind::WorkflowTrap,
209                detail: Some(detail),
210            },
211            FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
212                reason_inner: name,
213                reason_full,
214                kind: PermanentFailureKind::JoinSetNameConflict,
215                detail: None,
216            },
217        }
218    }
219}