apalis_core/
error.rs

1use std::{
2    error::Error as StdError, marker::PhantomData, pin::Pin, task::{Context, Poll}, time::Duration
3};
4use thiserror::Error;
5use tower_service::Service;
6
7/// Convenience type alias
8pub type BoxDynError = Box<dyn StdError + 'static + Send + Sync>;
9/// Execution should be aborted
10/// This signifies that the task should not be retried
11#[derive(Error, Debug)]
12#[error("AbortError: {source}")]
13pub struct AbortError {
14    #[source]
15    source: BoxDynError,
16}
17impl AbortError {
18    /// Create a new abort error
19    pub fn new<E: Into<BoxDynError>>(err: E) -> Self {
20        AbortError { source: err.into() }
21    }
22}
23
24/// Execution should be retried after a specific duration
25/// This increases the attempts
26#[derive(Error, Debug)]
27#[error("RetryError: {source}")]
28pub struct RetryAfterError {
29    #[source]
30    source: BoxDynError,
31    duration: Duration,
32}
33
34impl RetryAfterError {
35    /// Create a new retry after error
36    pub fn new<E: Into<BoxDynError>>(err: E, duration: Duration) -> Self {
37        RetryAfterError {
38            source: err.into(),
39            duration,
40        }
41    }
42
43    /// Get the duration after which the task should be retried
44    pub fn get_duration(&self) -> Duration {
45        self.duration
46    }
47}
48
49/// Execution should be deferred, will be retried instantly
50#[derive(Error, Debug)]
51#[error("DeferredError: {source}")]
52pub struct DeferredError {
53    #[source]
54    source: BoxDynError,
55}
56
57/// Possible errors that can occur when running a worker.
58#[derive(Error, Debug)]
59pub enum WorkerError {
60    /// An error occurred while consuming the task stream.
61    #[error("Failed to consume task stream: {0}")]
62    StreamError(BoxDynError),
63    /// An error occurred in the worker's heartbeat.
64    #[error("Heartbeat error: {0}")]
65    HeartbeatError(BoxDynError),
66    /// An error occurred while trying to change the state of the worker.
67    #[error("Failed to handle the new state: {0}")]
68    StateError(#[from] WorkerStateError),
69    /// A worker that terminates when .stop was called
70    #[error("Worker stopped and gracefully exited")]
71    GracefulExit,
72    /// A worker panicked and the panic was caught.
73    #[error("Worker panicked: {0}")]
74    PanicError(String),
75    /// An error occurred while handling io
76    #[error("IO error: {0}")]
77    IoError(#[from] std::io::Error),
78}
79
80/// Errors related to worker state transitions
81#[derive(Error, Debug)]
82pub enum WorkerStateError {
83    /// Worker not started
84    #[error("Worker not started, did you forget to call worker.start()")]
85    NotStarted,
86    /// Worker already started
87    #[error("Worker already started")]
88    AlreadyStarted,
89    /// Worker is not running
90    #[error("Worker is not running")]
91    NotRunning,
92    /// Worker is not paused
93    #[error("Worker is not paused")]
94    NotPaused,
95    /// Worker is shutting down
96    #[error("Worker is shutting down")]
97    ShuttingDown,
98    /// Invalid state provided
99    #[error("Worker provided with invalid state {0}")]
100    InvalidState(String),
101}
102
103/// A Tower layer for handling and converting service errors into a `BoxDynError`.
104///
105/// The service's error type must implement `std::error::Error`, allowing for flexible
106/// error handling, especially when dealing with trait objects or complex error chains.
107#[derive(Clone, Debug)]
108pub struct ErrorHandlingLayer {
109    _p: PhantomData<()>,
110}
111
112impl ErrorHandlingLayer {
113    /// Create a new ErrorHandlingLayer
114    pub fn new() -> Self {
115        Self { _p: PhantomData }
116    }
117}
118
119impl Default for ErrorHandlingLayer {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125impl<S> tower_layer::Layer<S> for ErrorHandlingLayer {
126    type Service = ErrorHandlingService<S>;
127
128    fn layer(&self, service: S) -> Self::Service {
129        ErrorHandlingService { service }
130    }
131}
132
133/// The underlying service
134#[derive(Clone, Debug)]
135pub struct ErrorHandlingService<S> {
136    service: S,
137}
138
139impl<S, Request> Service<Request> for ErrorHandlingService<S>
140where
141    S: Service<Request>,
142    S::Error: std::error::Error + Send + Sync + 'static,
143    S::Future: Send + 'static,
144{
145    type Response = S::Response;
146    type Error = BoxDynError;
147    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
148
149    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150        self.service.poll_ready(cx).map_err(|e| {
151            let boxed_error: BoxDynError = e.into();
152            boxed_error
153        })
154    }
155
156    fn call(&mut self, req: Request) -> Self::Future {
157        let fut = self.service.call(req);
158
159        Box::pin(async move {
160            fut.await.map_err(|e| {
161                let boxed_error: BoxDynError = e.into();
162                boxed_error
163            })
164        })
165    }
166}