1use std::{
2 error::Error as StdError,
3 marker::PhantomData,
4 pin::Pin,
5 task::{Context, Poll},
6 time::Duration,
7};
8use thiserror::Error;
9use tower_service::Service;
10
11pub type BoxDynError = Box<dyn StdError + 'static + Send + Sync>;
13#[derive(Error, Debug)]
16#[error("AbortError: {source}")]
17pub struct AbortError {
18 #[source]
19 source: BoxDynError,
20}
21impl AbortError {
22 pub fn new<E: Into<BoxDynError>>(err: E) -> Self {
24 Self { source: err.into() }
25 }
26}
27
28#[derive(Error, Debug)]
31#[error("RetryError: {source}")]
32pub struct RetryAfterError {
33 #[source]
34 source: BoxDynError,
35 duration: Duration,
36}
37
38impl RetryAfterError {
39 pub fn new<E: Into<BoxDynError>>(err: E, duration: Duration) -> Self {
41 Self {
42 source: err.into(),
43 duration,
44 }
45 }
46
47 #[must_use]
49 pub fn get_duration(&self) -> Duration {
50 self.duration
51 }
52}
53
54#[derive(Error, Debug)]
56#[error("DeferredError: {source}")]
57pub struct DeferredError {
58 #[source]
59 source: BoxDynError,
60}
61
62#[derive(Error, Debug)]
64pub enum WorkerError {
65 #[error("Failed to consume task stream: {0}")]
67 StreamError(BoxDynError),
68 #[error("Heartbeat error: {0}")]
70 HeartbeatError(BoxDynError),
71 #[error("Failed to handle the new state: {0}")]
73 StateError(#[from] WorkerStateError),
74 #[error("Worker stopped and gracefully exited")]
76 GracefulExit,
77 #[error("Worker panicked: {0}")]
79 PanicError(String),
80 #[error("IO error: {0}")]
82 IoError(#[from] std::io::Error),
83}
84
85#[derive(Error, Debug)]
87pub enum WorkerStateError {
88 #[error("Worker not started, did you forget to call worker.start()")]
90 NotStarted,
91 #[error("Worker already started")]
93 AlreadyStarted,
94 #[error("Worker is not running")]
96 NotRunning,
97 #[error("Worker is not paused")]
99 NotPaused,
100 #[error("Worker is shutting down")]
102 ShuttingDown,
103 #[error("Worker provided with invalid state {0}")]
105 InvalidState(String),
106}
107
108#[derive(Clone, Debug)]
113pub struct ErrorHandlingLayer {
114 _p: PhantomData<()>,
115}
116
117impl ErrorHandlingLayer {
118 #[must_use]
120 pub fn new() -> Self {
121 Self { _p: PhantomData }
122 }
123}
124
125impl Default for ErrorHandlingLayer {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131impl<S> tower_layer::Layer<S> for ErrorHandlingLayer {
132 type Service = ErrorHandlingService<S>;
133
134 fn layer(&self, service: S) -> Self::Service {
135 ErrorHandlingService { service }
136 }
137}
138
139#[derive(Clone, Debug)]
141pub struct ErrorHandlingService<S> {
142 service: S,
143}
144
145impl<S, Request> Service<Request> for ErrorHandlingService<S>
146where
147 S: Service<Request>,
148 S::Error: std::error::Error + Send + Sync + 'static,
149 S::Future: Send + 'static,
150{
151 type Response = S::Response;
152 type Error = BoxDynError;
153 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
154
155 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156 self.service.poll_ready(cx).map_err(|e| {
157 let boxed_error: BoxDynError = e.into();
158 boxed_error
159 })
160 }
161
162 fn call(&mut self, req: Request) -> Self::Future {
163 let fut = self.service.call(req);
164
165 Box::pin(async move {
166 fut.await.map_err(|e| {
167 let boxed_error: BoxDynError = e.into();
168 boxed_error
169 })
170 })
171 }
172}