obeli_sk_executor/
worker.rs

1use async_trait::async_trait;
2use concepts::ExecutionFailureKind;
3use concepts::ExecutionId;
4use concepts::ExecutionMetadata;
5use concepts::FunctionMetadata;
6use concepts::TrapKind;
7use concepts::storage::DbErrorWrite;
8use concepts::storage::HistoryEvent;
9use concepts::storage::Locked;
10use concepts::storage::Version;
11use concepts::storage::http_client_trace::HttpClientTrace;
12use concepts::{FinishedExecutionError, StrVariant, storage::JoinSetResponseEvent};
13use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
14use concepts::{Params, SupportedFunctionReturnValue};
15use tracing::Span;
16
17#[async_trait]
18pub trait Worker: Send + Sync + 'static {
19    async fn run(&self, ctx: WorkerContext) -> WorkerResult;
20
21    // List exported functions without extensions.
22    // Used by executor.
23    // TODO: Rename to `exported_functions_noext`
24    fn exported_functions(&self) -> &[FunctionMetadata];
25}
26
27#[must_use]
28#[derive(Debug)]
29pub enum WorkerResult {
30    Ok(
31        SupportedFunctionReturnValue,
32        Version,
33        Option<Vec<HttpClientTrace>>,
34    ),
35    // If no write occured, the watcher will timeout the execution and retry after backoff which avoids the busy loop
36    DbUpdatedByWorkerOrWatcher,
37    Err(WorkerError),
38}
39
40#[derive(Debug)]
41pub struct WorkerContext {
42    pub execution_id: ExecutionId,
43    pub metadata: ExecutionMetadata,
44    pub ffqn: FunctionFqn,
45    pub params: Params,
46    pub event_history: Vec<(HistoryEvent, Version)>,
47    pub responses: Vec<JoinSetResponseEvent>,
48    pub version: Version,
49    pub can_be_retried: bool,
50    pub worker_span: Span,
51    pub locked_event: Locked,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum WorkerError {
56    // retriable errors
57    // Used by activity worker
58    #[error("activity {trap_kind}: {reason}")]
59    ActivityTrap {
60        reason: String,
61        trap_kind: TrapKind,
62        detail: Option<String>,
63        version: Version,
64        http_client_traces: Option<Vec<HttpClientTrace>>,
65    },
66    #[error("{reason}")]
67    ActivityPreopenedDirError {
68        reason: String,
69        detail: String,
70        version: Version,
71    },
72    // Used by activity worker, must not be returned when retries are exhausted.
73    #[error("activity returned error")]
74    ActivityReturnedError {
75        detail: Option<String>,
76        version: Version,
77        http_client_traces: Option<Vec<HttpClientTrace>>,
78    },
79    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
80    #[error("limit reached: {reason}")]
81    LimitReached { reason: String, version: Version },
82    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
83    #[error("temporary timeout")]
84    TemporaryTimeout {
85        http_client_traces: Option<Vec<HttpClientTrace>>,
86        version: Version,
87    },
88    #[error(transparent)]
89    DbError(DbErrorWrite),
90    // non-retriable errors
91    #[error("fatal error: {0}")]
92    FatalError(FatalError, Version),
93}
94
95#[derive(Debug, thiserror::Error)]
96pub enum FatalError {
97    // Used by workflow worker
98    #[error("nondeterminism detected")]
99    NondeterminismDetected { detail: String },
100    // Used by activity worker, workflow worker
101    #[error(transparent)]
102    ParamsParsingError(ParamsParsingError),
103    // Used by activity worker, workflow worker
104    #[error("cannot instantiate: {reason}")]
105    CannotInstantiate { reason: String, detail: String },
106    // Used by activity worker, workflow worker
107    #[error(transparent)]
108    ResultParsingError(ResultParsingError),
109    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
110    #[error("error calling imported function {ffqn} : {reason}")]
111    ImportedFunctionCallError {
112        ffqn: FunctionFqn,
113        reason: StrVariant,
114        detail: Option<String>,
115    },
116    /// Workflow trap if `retry_on_trap` is disabled.
117    #[error("workflow {trap_kind}: {reason}")]
118    WorkflowTrap {
119        reason: String,
120        trap_kind: TrapKind,
121        detail: Option<String>,
122    },
123    #[error("out of fuel: {reason}")]
124    OutOfFuel { reason: String },
125    #[error("constraint violation: {reason}")]
126    ConstraintViolation { reason: StrVariant },
127
128    #[error("cancelled")]
129    Cancelled,
130}
131
132impl From<FatalError> for FinishedExecutionError {
133    fn from(err: FatalError) -> Self {
134        let reason_generic = err.to_string(); // Override with err's reason if no information is lost.
135        match err {
136            FatalError::NondeterminismDetected { detail } => FinishedExecutionError {
137                reason: None,
138                kind: ExecutionFailureKind::NondeterminismDetected,
139                detail: Some(detail),
140            },
141            FatalError::OutOfFuel { reason } => FinishedExecutionError {
142                reason: Some(reason),
143                kind: ExecutionFailureKind::OutOfFuel,
144                detail: None,
145            },
146            FatalError::ParamsParsingError(err) => FinishedExecutionError {
147                reason: Some(reason_generic),
148                kind: ExecutionFailureKind::Uncategorized,
149                detail: err.detail(),
150            },
151            FatalError::CannotInstantiate { reason: _, detail } => FinishedExecutionError {
152                reason: Some(reason_generic),
153                kind: ExecutionFailureKind::Uncategorized,
154                detail: Some(detail),
155            },
156            FatalError::ResultParsingError(_) | FatalError::ConstraintViolation { reason: _ } => {
157                FinishedExecutionError {
158                    reason: Some(reason_generic),
159                    kind: ExecutionFailureKind::Uncategorized,
160                    detail: None,
161                }
162            }
163            FatalError::ImportedFunctionCallError { detail, .. }
164            | FatalError::WorkflowTrap { detail, .. } => FinishedExecutionError {
165                reason: Some(reason_generic),
166                kind: ExecutionFailureKind::Uncategorized,
167                detail,
168            },
169            FatalError::Cancelled => FinishedExecutionError {
170                kind: ExecutionFailureKind::Cancelled,
171                reason: None,
172                detail: None,
173            },
174        }
175    }
176}