1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
use std::fmt::Display;
use thiserror::Error;
use tracing::{error, warn};
#[cfg(feature = "derive")]
pub use gasket_derive::*;
pub trait Stage: Sized + Send {
type Unit;
type Worker: Worker<Self>;
fn name(&self) -> &str;
fn metrics(&self) -> crate::metrics::Registry {
Default::default()
}
}
#[derive(Error, Debug)]
pub enum WorkerError {
#[error("error sending work unit through output port")]
Send,
#[error("error receiving work unit through input port")]
Recv,
#[error("operation panic, stage should stop")]
Panic,
#[error("operation requires a restart of the stage")]
Restart,
#[error("operation should be retried")]
Retry,
}
type Result<T> = core::result::Result<T, WorkerError>;
pub trait AsWorkError<T> {
fn or_panic(self) -> Result<T>;
fn or_retry(self) -> Result<T>;
fn or_restart(self) -> Result<T>;
}
impl<T, E> AsWorkError<T> for core::result::Result<T, E>
where
E: Display,
{
fn or_panic(self) -> Result<T> {
match self {
Ok(x) => Ok(x),
Err(x) => {
error!(%x);
Err(WorkerError::Panic)
}
}
}
fn or_retry(self) -> Result<T> {
match self {
Ok(x) => Ok(x),
Err(x) => {
warn!(%x);
Err(WorkerError::Retry)
}
}
}
fn or_restart(self) -> Result<T> {
match self {
Ok(x) => Ok(x),
Err(x) => {
warn!(%x);
Err(WorkerError::Restart)
}
}
}
}
pub enum WorkSchedule<U> {
/// worker is not doing anything, but might in the future
Idle,
/// a work unit should be executed
Unit(U),
/// worker has done all the work it needed
Done,
}
#[async_trait::async_trait(?Send)]
pub trait Worker<S>: Sized
where
S: Stage,
{
/// Bootstrap a new worker
///
/// It's responsible for initializing any resources needed by the worker.
///
/// This future will be cancelled if the stage is requested to shut-down.
/// The implementation of this function needs to be _cancellation
/// safe_. Don't rely on state that crosses await points to avoid potential
/// data loss.
async fn bootstrap(stage: &S) -> Result<Self>;
/// Schedule the next work unit for execution
///
/// This usually means reading messages from input ports and returning a
/// work unit that contains all data required for execution.
///
/// This future will be cancelled if the stage is requested to shut-down.
/// The implementation of this function needs to be _cancellation safe_.
/// Don't rely on state that crosses await points to avoid potential data
/// loss.
async fn schedule(&mut self, stage: &mut S) -> Result<WorkSchedule<S::Unit>>;
/// Execute the action described by the work unit
///
/// This usually means doing required computation, generating side-effect
/// and submitting message through the output ports
///
/// This future will be cancelled if the stage is requested to shut-down.
/// The implementation of this function needs to be _cancellation safe_.
/// Don't rely on state that crosses await points to avoid potential data
/// loss.
async fn execute(&mut self, unit: &S::Unit, stage: &mut S) -> Result<()>;
/// Shutdown the worker gracefully
///
/// This usually means releasing any relevant resources in use by the
/// current worker, either because we need them for a different worker or
/// because the stage is being dismissed.
async fn teardown(&mut self) -> Result<()> {
Ok(())
}
}