1use std::{
2 error::Error as StdError, marker::PhantomData, pin::Pin, task::{Context, Poll}, time::Duration
3};
4use thiserror::Error;
5use tower_service::Service;
6
7pub type BoxDynError = Box<dyn StdError + 'static + Send + Sync>;
9#[derive(Error, Debug)]
12#[error("AbortError: {source}")]
13pub struct AbortError {
14 #[source]
15 source: BoxDynError,
16}
17impl AbortError {
18 pub fn new<E: Into<BoxDynError>>(err: E) -> Self {
20 AbortError { source: err.into() }
21 }
22}
23
24#[derive(Error, Debug)]
27#[error("RetryError: {source}")]
28pub struct RetryAfterError {
29 #[source]
30 source: BoxDynError,
31 duration: Duration,
32}
33
34impl RetryAfterError {
35 pub fn new<E: Into<BoxDynError>>(err: E, duration: Duration) -> Self {
37 RetryAfterError {
38 source: err.into(),
39 duration,
40 }
41 }
42
43 pub fn get_duration(&self) -> Duration {
45 self.duration
46 }
47}
48
49#[derive(Error, Debug)]
51#[error("DeferredError: {source}")]
52pub struct DeferredError {
53 #[source]
54 source: BoxDynError,
55}
56
57#[derive(Error, Debug)]
59pub enum WorkerError {
60 #[error("Failed to consume task stream: {0}")]
62 StreamError(BoxDynError),
63 #[error("Heartbeat error: {0}")]
65 HeartbeatError(BoxDynError),
66 #[error("Failed to handle the new state: {0}")]
68 StateError(#[from] WorkerStateError),
69 #[error("Worker stopped and gracefully exited")]
71 GracefulExit,
72 #[error("Worker panicked: {0}")]
74 PanicError(String),
75 #[error("IO error: {0}")]
77 IoError(#[from] std::io::Error),
78}
79
80#[derive(Error, Debug)]
82pub enum WorkerStateError {
83 #[error("Worker not started, did you forget to call worker.start()")]
85 NotStarted,
86 #[error("Worker already started")]
88 AlreadyStarted,
89 #[error("Worker is not running")]
91 NotRunning,
92 #[error("Worker is not paused")]
94 NotPaused,
95 #[error("Worker is shutting down")]
97 ShuttingDown,
98 #[error("Worker provided with invalid state {0}")]
100 InvalidState(String),
101}
102
103#[derive(Clone, Debug)]
108pub struct ErrorHandlingLayer {
109 _p: PhantomData<()>,
110}
111
112impl ErrorHandlingLayer {
113 pub fn new() -> Self {
115 Self { _p: PhantomData }
116 }
117}
118
119impl Default for ErrorHandlingLayer {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125impl<S> tower_layer::Layer<S> for ErrorHandlingLayer {
126 type Service = ErrorHandlingService<S>;
127
128 fn layer(&self, service: S) -> Self::Service {
129 ErrorHandlingService { service }
130 }
131}
132
133#[derive(Clone, Debug)]
135pub struct ErrorHandlingService<S> {
136 service: S,
137}
138
139impl<S, Request> Service<Request> for ErrorHandlingService<S>
140where
141 S: Service<Request>,
142 S::Error: std::error::Error + Send + Sync + 'static,
143 S::Future: Send + 'static,
144{
145 type Response = S::Response;
146 type Error = BoxDynError;
147 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
148
149 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150 self.service.poll_ready(cx).map_err(|e| {
151 let boxed_error: BoxDynError = e.into();
152 boxed_error
153 })
154 }
155
156 fn call(&mut self, req: Request) -> Self::Future {
157 let fut = self.service.call(req);
158
159 Box::pin(async move {
160 fut.await.map_err(|e| {
161 let boxed_error: BoxDynError = e.into();
162 boxed_error
163 })
164 })
165 }
166}