obeli_sk_executor/
worker.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use concepts::prefixed_ulid::ExecutionIdDerived;
use concepts::prefixed_ulid::RunId;
use concepts::storage::HistoryEvent;
use concepts::storage::Version;
use concepts::ExecutionId;
use concepts::ExecutionMetadata;
use concepts::FunctionMetadata;
use concepts::PermanentFailureKind;
use concepts::TrapKind;
use concepts::{
    storage::{DbError, JoinSetResponseEvent},
    FinishedExecutionError, StrVariant,
};
use concepts::{FunctionFqn, ParamsParsingError, ResultParsingError};
use concepts::{Params, SupportedFunctionReturnValue};
use tracing::Span;

#[async_trait]
pub trait Worker: Send + Sync + 'static {
    async fn run(&self, ctx: WorkerContext) -> WorkerResult;

    fn exported_functions(&self) -> &[FunctionMetadata];

    fn imported_functions(&self) -> &[FunctionMetadata];
}

#[must_use]
#[derive(Debug)]
pub enum WorkerResult {
    Ok(SupportedFunctionReturnValue, Version),
    DbUpdatedByWorker,
    Err(WorkerError),
}

#[derive(Debug)]
pub struct WorkerContext {
    pub execution_id: ExecutionId,
    pub run_id: RunId,
    pub metadata: ExecutionMetadata,
    pub ffqn: FunctionFqn,
    pub params: Params,
    pub event_history: Vec<HistoryEvent>,
    pub responses: Vec<JoinSetResponseEvent>,
    pub version: Version,
    pub execution_deadline: DateTime<Utc>,
    pub can_be_retried: bool,
    pub worker_span: Span,
}

#[derive(Debug, thiserror::Error)]
pub enum WorkerError {
    // retriable errors
    // Used by activity worker
    #[error("activity {trap_kind}: {reason}")]
    ActivityTrap {
        reason: String,
        trap_kind: TrapKind,
        detail: String,
        version: Version,
    },
    // Used by activity worker, must not be returned when retries are exhausted.
    #[error("activity returned error")]
    ActivityReturnedError {
        detail: Option<String>,
        version: Version,
    },
    /// Workflow trap when `retry_on_trap` is enabled.
    #[error("workflow trap handled as temporary error: {reason}")]
    TemporaryWorkflowTrap {
        reason: String,
        kind: TrapKind,
        detail: Option<String>,
        version: Version,
    },
    // Resources are exhausted, retry after a delay as Unlocked, without increasing temporary event count.
    #[error("limit reached: {reason}")]
    LimitReached { reason: String, version: Version },
    // Used by activity worker, best effort. If this is not persisted, the expired timers watcher will append it.
    #[error("temporary timeout")]
    TemporaryTimeout,
    #[error(transparent)]
    DbError(DbError),
    // non-retriable errors
    #[error("fatal error: {0}")]
    FatalError(FatalError, Version),
}

#[derive(Debug, thiserror::Error)]
pub enum FatalError {
    /// Used by workflow worker when directly called child execution fails.
    #[error("child finished with an execution error: {child_execution_id}")]
    UnhandledChildExecutionError {
        child_execution_id: ExecutionIdDerived,
        root_cause_id: ExecutionIdDerived,
    },

    // Used by workflow worker
    #[error("nondeterminism detected")]
    NondeterminismDetected { detail: String },
    // Used by activity worker, workflow worker
    #[error(transparent)]
    ParamsParsingError(ParamsParsingError),
    // Used by activity worker, workflow worker
    #[error("cannot instantiate: {reason}")]
    CannotInstantiate { reason: String, detail: String },
    // Used by activity worker, workflow worker
    #[error(transparent)]
    ResultParsingError(ResultParsingError),
    /// Used when workflow cannot call an imported function, either a child execution or a function from workflow-support.
    #[error("error calling imported function {ffqn} : {reason}")]
    ImportedFunctionCallError {
        ffqn: FunctionFqn,
        reason: StrVariant,
        detail: Option<String>,
    },

    /// Workflow trap if `retry_on_trap` is disabled.
    #[error("workflow {trap_kind}: {reason}")]
    WorkflowTrap {
        reason: String,
        trap_kind: TrapKind,
        detail: String,
    },
    /// Workflow attempted to create a join set with the same name twice.
    #[error("join set already exists with name `{name}`")]
    JoinSetNameConflict { name: String },
}

impl From<FatalError> for FinishedExecutionError {
    fn from(value: FatalError) -> Self {
        let reason_full = value.to_string();
        match value {
            FatalError::UnhandledChildExecutionError {
                child_execution_id,
                root_cause_id,
            } => FinishedExecutionError::UnhandledChildExecutionError {
                child_execution_id,
                root_cause_id,
            },
            FatalError::NondeterminismDetected { detail } => {
                FinishedExecutionError::PermanentFailure {
                    reason_inner: reason_full.clone(),
                    reason_full,
                    kind: PermanentFailureKind::NondeterminismDetected,
                    detail: Some(detail),
                }
            }
            FatalError::ParamsParsingError(params_parsing_error) => {
                FinishedExecutionError::PermanentFailure {
                    reason_inner: reason_full.to_string(),
                    reason_full,
                    kind: PermanentFailureKind::ParamsParsingError,
                    detail: params_parsing_error.detail(),
                }
            }
            FatalError::CannotInstantiate {
                detail,
                reason: reason_inner,
                ..
            } => FinishedExecutionError::PermanentFailure {
                reason_inner,
                reason_full,
                kind: PermanentFailureKind::CannotInstantiate,
                detail: Some(detail),
            },
            FatalError::ResultParsingError(_) => FinishedExecutionError::PermanentFailure {
                reason_inner: reason_full.to_string(),
                reason_full,
                kind: PermanentFailureKind::ResultParsingError,
                detail: None,
            },
            FatalError::ImportedFunctionCallError {
                detail,
                reason: reason_inner,
                ..
            } => FinishedExecutionError::PermanentFailure {
                reason_inner: reason_inner.to_string(),
                reason_full,
                kind: PermanentFailureKind::ImportedFunctionCallError,
                detail,
            },
            FatalError::WorkflowTrap {
                detail,
                reason: reason_inner,
                ..
            } => FinishedExecutionError::PermanentFailure {
                reason_inner,
                reason_full,
                kind: PermanentFailureKind::WorkflowTrap,
                detail: Some(detail),
            },
            FatalError::JoinSetNameConflict { name } => FinishedExecutionError::PermanentFailure {
                reason_inner: name,
                reason_full,
                kind: PermanentFailureKind::JoinSetNameConflict,
                detail: None,
            },
        }
    }
}