obeli_sk_executor/
worker.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::PermanentFailureKind;
7use concepts::TrapKind;
8use concepts::prefixed_ulid::ExecutionIdDerived;
9use concepts::prefixed_ulid::RunId;
10use concepts::storage::HistoryEvent;
11use concepts::storage::Version;
12use concepts::storage::http_client_trace::HttpClientTrace;
13use concepts::{
14    FinishedExecutionError, StrVariant,
15    storage::{DbError, JoinSetResponseEvent},
16};
17use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
18use concepts::{Params, SupportedFunctionReturnValue};
19use tracing::Span;
20
21#[async_trait]
22pub trait Worker: Send + Sync + 'static {
23    async fn run(&self, ctx: WorkerContext) -> WorkerResult;
24
25    // List exported functions without extensions.
26    // Used by executor.
27    // TODO: Rename to `exported_functions_noext`
28    fn exported_functions(&self) -> &[FunctionMetadata];
29}
30
31#[must_use]
32#[derive(Debug)]
33pub enum WorkerResult {
34    Ok(
35        SupportedFunctionReturnValue,
36        Version,
37        Option<Vec<HttpClientTrace>>,
38    ),
39    DbUpdatedByWorkerOrWatcher,
40    Err(WorkerError),
41}
42
43#[derive(Debug)]
44pub struct WorkerContext {
45    pub execution_id: ExecutionId,
46    pub run_id: RunId,
47    pub metadata: ExecutionMetadata,
48    pub ffqn: FunctionFqn,
49    pub params: Params,
50    pub event_history: Vec<HistoryEvent>,
51    pub responses: Vec<JoinSetResponseEvent>,
52    pub version: Version,
53    pub execution_deadline: DateTime<Utc>,
54    pub can_be_retried: bool,
55    pub worker_span: Span,
56}
57
58#[derive(Debug, thiserror::Error)]
59pub enum WorkerError {
60    // retriable errors
61    // Used by activity worker
62    #[error("activity {trap_kind}: {reason}")]
63    ActivityTrap {
64        reason: String,
65        trap_kind: TrapKind,
66        detail: String,
67        version: Version,
68        http_client_traces: Option<Vec<HttpClientTrace>>,
69    },
70    // Used by activity worker, must not be returned when retries are exhausted.
71    #[error("activity returned error")]
72    ActivityReturnedError {
73        detail: Option<String>,
74        version: Version,
75        http_client_traces: Option<Vec<HttpClientTrace>>,
76    },
77    /// Workflow trap when `retry_on_trap` is enabled.
78    #[error("workflow trap handled as temporary error: {reason}")]
79    TemporaryWorkflowTrap {
80        reason: String,
81        kind: TrapKind,
82        detail: Option<String>,
83        version: Version,
84    },
85    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
86    #[error("limit reached: {reason}")]
87    LimitReached { reason: String, version: Version },
88    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
89    #[error("temporary timeout")]
90    TemporaryTimeout {
91        http_client_traces: Option<Vec<HttpClientTrace>>,
92        version: Version,
93    },
94    #[error(transparent)]
95    DbError(DbError),
96    // non-retriable errors
97    #[error("fatal error: {0}")]
98    FatalError(FatalError, Version),
99}
100
101#[derive(Debug, thiserror::Error)]
102pub enum FatalError {
103    /// Used by workflow worker when directly called child execution fails.
104    #[error("child finished with an execution error: {child_execution_id}")]
105    UnhandledChildExecutionError {
106        child_execution_id: ExecutionIdDerived,
107        root_cause_id: ExecutionIdDerived,
108    },
109
110    // Used by workflow worker
111    #[error("nondeterminism detected")]
112    NondeterminismDetected { detail: String },
113    // Used by activity worker, workflow worker
114    #[error(transparent)]
115    ParamsParsingError(ParamsParsingError),
116    // Used by activity worker, workflow worker
117    #[error("cannot instantiate: {reason}")]
118    CannotInstantiate { reason: String, detail: String },
119    // Used by activity worker, workflow worker
120    #[error(transparent)]
121    ResultParsingError(ResultParsingError),
122    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
123    #[error("error calling imported function {ffqn} : {reason}")]
124    ImportedFunctionCallError {
125        ffqn: FunctionFqn,
126        reason: StrVariant,
127        detail: Option<String>,
128    },
129
130    /// Workflow trap if `retry_on_trap` is disabled.
131    #[error("workflow {trap_kind}: {reason}")]
132    WorkflowTrap {
133        reason: String,
134        trap_kind: TrapKind,
135        detail: String,
136    },
137    /// Workflow attempted to create a join set with the same name twice.
138    #[error("join set already exists with name `{name}`")]
139    JoinSetNameConflict { name: String },
140}
141
142impl From<FatalError> for FinishedExecutionError {
143    fn from(value: FatalError) -> Self {
144        let reason_full = value.to_string();
145        match value {
146            FatalError::UnhandledChildExecutionError {
147                child_execution_id,
148                root_cause_id,
149            } => FinishedExecutionError::UnhandledChildExecutionError {
150                child_execution_id,
151                root_cause_id,
152            },
153            FatalError::NondeterminismDetected { detail } => {
154                FinishedExecutionError::PermanentFailure {
155                    reason_inner: reason_full.clone(),
156                    reason_full,
157                    kind: PermanentFailureKind::NondeterminismDetected,
158                    detail: Some(detail),
159                }
160            }
161            FatalError::ParamsParsingError(params_parsing_error) => {
162                FinishedExecutionError::PermanentFailure {
163                    reason_inner: reason_full.to_string(),
164                    reason_full,
165                    kind: PermanentFailureKind::ParamsParsingError,
166                    detail: params_parsing_error.detail(),
167                }
168            }
169            FatalError::CannotInstantiate {
170                detail,
171                reason: reason_inner,
172                ..
173            } => FinishedExecutionError::PermanentFailure {
174                reason_inner,
175                reason_full,
176                kind: PermanentFailureKind::CannotInstantiate,
177                detail: Some(detail),
178            },
179            FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
180                reason_inner: reason_full.to_string(),
181                reason_full,
182                kind: PermanentFailureKind::ResultParsingError,
183                detail: None,
184            },
185            FatalError::ImportedFunctionCallError {
186                detail,
187                reason: reason_inner,
188                ..
189            } => FinishedExecutionError::PermanentFailure {
190                reason_inner: reason_inner.to_string(),
191                reason_full,
192                kind: PermanentFailureKind::ImportedFunctionCallError,
193                detail,
194            },
195            FatalError::WorkflowTrap {
196                detail,
197                reason: reason_inner,
198                ..
199            } => FinishedExecutionError::PermanentFailure {
200                reason_inner,
201                reason_full,
202                kind: PermanentFailureKind::WorkflowTrap,
203                detail: Some(detail),
204            },
205            FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
206                reason_inner: name,
207                reason_full,
208                kind: PermanentFailureKind::JoinSetNameConflict,
209                detail: None,
210            },
211        }
212    }
213}