obeli_sk_executor/
worker.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::ExecutionIdDerived;
9use concepts::prefixed_ulid::RunId;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{
14    FinishedExecutionError, StrVariant,
15    storage::{DbError, JoinSetResponseEvent},
16};
17use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
18use concepts::{Params, SupportedFunctionReturnValue};
19use tracing::Span;
20
21#[async_trait]
22pub trait Worker: Send + Sync + 'static {
23    async fn run(&self, ctx: WorkerContext) -> WorkerResult;
24
25    fn exported_functions(&self) -> &[FunctionMetadata];
26
27    fn imported_functions(&self) -> &[FunctionMetadata];
28}
29
30#[must_use]
31#[derive(Debug)]
32pub enum WorkerResult {
33    Ok(
34        SupportedFunctionReturnValue,
35        Version,
36        Option<Vec<HttpClientTrace>>,
37    ),
38    DbUpdatedByWorker,
39    Err(WorkerError),
40}
41
42#[derive(Debug)]
43pub struct WorkerContext {
44    pub execution_id: ExecutionId,
45    pub run_id: RunId,
46    pub metadata: ExecutionMetadata,
47    pub ffqn: FunctionFqn,
48    pub params: Params,
49    pub event_history: Vec<HistoryEvent>,
50    pub responses: Vec<JoinSetResponseEvent>,
51    pub version: Version,
52    pub execution_deadline: DateTime<Utc>,
53    pub can_be_retried: bool,
54    pub worker_span: Span,
55}
56
57#[derive(Debug, thiserror::Error)]
58pub enum WorkerError {
59    // retriable errors
60    // Used by activity worker
61    #[error("activity {trap_kind}: {reason}")]
62    ActivityTrap {
63        reason: String,
64        trap_kind: TrapKind,
65        detail: String,
66        version: Version,
67        http_client_traces: Option<Vec<HttpClientTrace>>,
68    },
69    // Used by activity worker, must not be returned when retries are exhausted.
70    #[error("activity returned error")]
71    ActivityReturnedError {
72        detail: Option<String>,
73        version: Version,
74        http_client_traces: Option<Vec<HttpClientTrace>>,
75    },
76    /// Workflow trap when `retry_on_trap` is enabled.
77    #[error("workflow trap handled as temporary error: {reason}")]
78    TemporaryWorkflowTrap {
79        reason: String,
80        kind: TrapKind,
81        detail: Option<String>,
82        version: Version,
83    },
84    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
85    #[error("limit reached: {reason}")]
86    LimitReached { reason: String, version: Version },
87    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
88    #[error("temporary timeout")]
89    TemporaryTimeout {
90        http_client_traces: Option<Vec<HttpClientTrace>>,
91        version: Version,
92    },
93    #[error("temporary timeout handled by watcher")]
94    // only applicable to workflows
95    TemporaryTimeoutHandledByWatcher,
96    #[error(transparent)]
97    DbError(DbError),
98    // non-retriable errors
99    #[error("fatal error: {0}")]
100    FatalError(FatalError, Version),
101}
102
103#[derive(Debug, thiserror::Error)]
104pub enum FatalError {
105    /// Used by workflow worker when directly called child execution fails.
106    #[error("child finished with an execution error: {child_execution_id}")]
107    UnhandledChildExecutionError {
108        child_execution_id: ExecutionIdDerived,
109        root_cause_id: ExecutionIdDerived,
110    },
111
112    // Used by workflow worker
113    #[error("nondeterminism detected")]
114    NondeterminismDetected { detail: String },
115    // Used by activity worker, workflow worker
116    #[error(transparent)]
117    ParamsParsingError(ParamsParsingError),
118    // Used by activity worker, workflow worker
119    #[error("cannot instantiate: {reason}")]
120    CannotInstantiate { reason: String, detail: String },
121    // Used by activity worker, workflow worker
122    #[error(transparent)]
123    ResultParsingError(ResultParsingError),
124    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
125    #[error("error calling imported function {ffqn} : {reason}")]
126    ImportedFunctionCallError {
127        ffqn: FunctionFqn,
128        reason: StrVariant,
129        detail: Option<String>,
130    },
131
132    /// Workflow trap if `retry_on_trap` is disabled.
133    #[error("workflow {trap_kind}: {reason}")]
134    WorkflowTrap {
135        reason: String,
136        trap_kind: TrapKind,
137        detail: String,
138    },
139    /// Workflow attempted to create a join set with the same name twice.
140    #[error("join set already exists with name `{name}`")]
141    JoinSetNameConflict { name: String },
142}
143
144impl From<FatalError> for FinishedExecutionError {
145    fn from(value: FatalError) -> Self {
146        let reason_full = value.to_string();
147        match value {
148            FatalError::UnhandledChildExecutionError {
149                child_execution_id,
150                root_cause_id,
151            } => FinishedExecutionError::UnhandledChildExecutionError {
152                child_execution_id,
153                root_cause_id,
154            },
155            FatalError::NondeterminismDetected { detail } => {
156                FinishedExecutionError::PermanentFailure {
157                    reason_inner: reason_full.clone(),
158                    reason_full,
159                    kind: PermanentFailureKind::NondeterminismDetected,
160                    detail: Some(detail),
161                }
162            }
163            FatalError::ParamsParsingError(params_parsing_error) => {
164                FinishedExecutionError::PermanentFailure {
165                    reason_inner: reason_full.to_string(),
166                    reason_full,
167                    kind: PermanentFailureKind::ParamsParsingError,
168                    detail: params_parsing_error.detail(),
169                }
170            }
171            FatalError::CannotInstantiate {
172                detail,
173                reason: reason_inner,
174                ..
175            } => FinishedExecutionError::PermanentFailure {
176                reason_inner,
177                reason_full,
178                kind: PermanentFailureKind::CannotInstantiate,
179                detail: Some(detail),
180            },
181            FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
182                reason_inner: reason_full.to_string(),
183                reason_full,
184                kind: PermanentFailureKind::ResultParsingError,
185                detail: None,
186            },
187            FatalError::ImportedFunctionCallError {
188                detail,
189                reason: reason_inner,
190                ..
191            } => FinishedExecutionError::PermanentFailure {
192                reason_inner: reason_inner.to_string(),
193                reason_full,
194                kind: PermanentFailureKind::ImportedFunctionCallError,
195                detail,
196            },
197            FatalError::WorkflowTrap {
198                detail,
199                reason: reason_inner,
200                ..
201            } => FinishedExecutionError::PermanentFailure {
202                reason_inner,
203                reason_full,
204                kind: PermanentFailureKind::WorkflowTrap,
205                detail: Some(detail),
206            },
207            FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
208                reason_inner: name,
209                reason_full,
210                kind: PermanentFailureKind::JoinSetNameConflict,
211                detail: None,
212            },
213        }
214    }
215}