obeli_sk_executor/
worker.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::prefixed_ulid::ExecutionIdDerived;
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::HistoryEvent;
6use concepts::storage::Version;
7use concepts::ExecutionId;
8use concepts::ExecutionMetadata;
9use concepts::FunctionMetadata;
10use concepts::PermanentFailureKind;
11use concepts::TrapKind;
12use concepts::{
13    storage::{DbError, JoinSetResponseEvent},
14    FinishedExecutionError, StrVariant,
15};
16use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
17use concepts::{Params, SupportedFunctionReturnValue};
18use tracing::Span;
19
20#[async_trait]
21pub trait Worker: Send + Sync + 'static {
22    async fn run(&self, ctx: WorkerContext) -> WorkerResult;
23
24    fn exported_functions(&self) -> &[FunctionMetadata];
25
26    fn imported_functions(&self) -> &[FunctionMetadata];
27}
28
29#[must_use]
30#[derive(Debug)]
31pub enum WorkerResult {
32    Ok(SupportedFunctionReturnValue, Version),
33    DbUpdatedByWorker,
34    Err(WorkerError),
35}
36
37#[derive(Debug)]
38pub struct WorkerContext {
39    pub execution_id: ExecutionId,
40    pub run_id: RunId,
41    pub metadata: ExecutionMetadata,
42    pub ffqn: FunctionFqn,
43    pub params: Params,
44    pub event_history: Vec<HistoryEvent>,
45    pub responses: Vec<JoinSetResponseEvent>,
46    pub version: Version,
47    pub execution_deadline: DateTime<Utc>,
48    pub can_be_retried: bool,
49    pub worker_span: Span,
50}
51
52#[derive(Debug, thiserror::Error)]
53pub enum WorkerError {
54    // retriable errors
55    // Used by activity worker
56    #[error("activity {trap_kind}: {reason}")]
57    ActivityTrap {
58        reason: String,
59        trap_kind: TrapKind,
60        detail: String,
61        version: Version,
62    },
63    // Used by activity worker, must not be returned when retries are exhausted.
64    #[error("activity returned error")]
65    ActivityReturnedError {
66        detail: Option<String>,
67        version: Version,
68    },
69    /// Workflow trap when `retry_on_trap` is enabled.
70    #[error("workflow trap handled as temporary error: {reason}")]
71    TemporaryWorkflowTrap {
72        reason: String,
73        kind: TrapKind,
74        detail: Option<String>,
75        version: Version,
76    },
77    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
78    #[error("limit reached: {reason}")]
79    LimitReached { reason: String, version: Version },
80    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
81    #[error("temporary timeout")]
82    TemporaryTimeout,
83    #[error(transparent)]
84    DbError(DbError),
85    // non-retriable errors
86    #[error("fatal error: {0}")]
87    FatalError(FatalError, Version),
88}
89
90#[derive(Debug, thiserror::Error)]
91pub enum FatalError {
92    /// Used by workflow worker when directly called child execution fails.
93    #[error("child finished with an execution error: {child_execution_id}")]
94    UnhandledChildExecutionError {
95        child_execution_id: ExecutionIdDerived,
96        root_cause_id: ExecutionIdDerived,
97    },
98
99    // Used by workflow worker
100    #[error("nondeterminism detected")]
101    NondeterminismDetected { detail: String },
102    // Used by activity worker, workflow worker
103    #[error(transparent)]
104    ParamsParsingError(ParamsParsingError),
105    // Used by activity worker, workflow worker
106    #[error("cannot instantiate: {reason}")]
107    CannotInstantiate { reason: String, detail: String },
108    // Used by activity worker, workflow worker
109    #[error(transparent)]
110    ResultParsingError(ResultParsingError),
111    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
112    #[error("error calling imported function {ffqn} : {reason}")]
113    ImportedFunctionCallError {
114        ffqn: FunctionFqn,
115        reason: StrVariant,
116        detail: Option<String>,
117    },
118
119    /// Workflow trap if `retry_on_trap` is disabled.
120    #[error("workflow {trap_kind}: {reason}")]
121    WorkflowTrap {
122        reason: String,
123        trap_kind: TrapKind,
124        detail: String,
125    },
126    /// Workflow attempted to create a join set with the same name twice.
127    #[error("join set already exists with name `{name}`")]
128    JoinSetNameConflict { name: String },
129}
130
131impl From<FatalError> for FinishedExecutionError {
132    fn from(value: FatalError) -> Self {
133        let reason_full = value.to_string();
134        match value {
135            FatalError::UnhandledChildExecutionError {
136                child_execution_id,
137                root_cause_id,
138            } => FinishedExecutionError::UnhandledChildExecutionError {
139                child_execution_id,
140                root_cause_id,
141            },
142            FatalError::NondeterminismDetected { detail } => {
143                FinishedExecutionError::PermanentFailure {
144                    reason_inner: reason_full.clone(),
145                    reason_full,
146                    kind: PermanentFailureKind::NondeterminismDetected,
147                    detail: Some(detail),
148                }
149            }
150            FatalError::ParamsParsingError(params_parsing_error) => {
151                FinishedExecutionError::PermanentFailure {
152                    reason_inner: reason_full.to_string(),
153                    reason_full,
154                    kind: PermanentFailureKind::ParamsParsingError,
155                    detail: params_parsing_error.detail(),
156                }
157            }
158            FatalError::CannotInstantiate {
159                detail,
160                reason: reason_inner,
161                ..
162            } => FinishedExecutionError::PermanentFailure {
163                reason_inner,
164                reason_full,
165                kind: PermanentFailureKind::CannotInstantiate,
166                detail: Some(detail),
167            },
168            FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
169                reason_inner: reason_full.to_string(),
170                reason_full,
171                kind: PermanentFailureKind::ResultParsingError,
172                detail: None,
173            },
174            FatalError::ImportedFunctionCallError {
175                detail,
176                reason: reason_inner,
177                ..
178            } => FinishedExecutionError::PermanentFailure {
179                reason_inner: reason_inner.to_string(),
180                reason_full,
181                kind: PermanentFailureKind::ImportedFunctionCallError,
182                detail,
183            },
184            FatalError::WorkflowTrap {
185                detail,
186                reason: reason_inner,
187                ..
188            } => FinishedExecutionError::PermanentFailure {
189                reason_inner,
190                reason_full,
191                kind: PermanentFailureKind::WorkflowTrap,
192                detail: Some(detail),
193            },
194            FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
195                reason_inner: name,
196                reason_full,
197                kind: PermanentFailureKind::JoinSetNameConflict,
198                detail: None,
199            },
200        }
201    }
202}