apalis_core/
error.rs

1use std::{
2    error::Error as StdError,
3    marker::PhantomData,
4    pin::Pin,
5    task::{Context, Poll},
6    time::Duration,
7};
8use thiserror::Error;
9use tower_service::Service;
10
11/// Convenience type alias
12pub type BoxDynError = Box<dyn StdError + 'static + Send + Sync>;
13/// Execution should be aborted
14/// This signifies that the task should not be retried
15#[derive(Error, Debug)]
16#[error("AbortError: {source}")]
17pub struct AbortError {
18    #[source]
19    source: BoxDynError,
20}
21impl AbortError {
22    /// Create a new abort error
23    pub fn new<E: Into<BoxDynError>>(err: E) -> Self {
24        Self { source: err.into() }
25    }
26}
27
28/// Execution should be retried after a specific duration
29/// This increases the attempts
30#[derive(Error, Debug)]
31#[error("RetryError: {source}")]
32pub struct RetryAfterError {
33    #[source]
34    source: BoxDynError,
35    duration: Duration,
36}
37
38impl RetryAfterError {
39    /// Create a new retry after error
40    pub fn new<E: Into<BoxDynError>>(err: E, duration: Duration) -> Self {
41        Self {
42            source: err.into(),
43            duration,
44        }
45    }
46
47    /// Get the duration after which the task should be retried
48    #[must_use]
49    pub fn get_duration(&self) -> Duration {
50        self.duration
51    }
52}
53
54/// Execution should be deferred, will be retried instantly
55#[derive(Error, Debug)]
56#[error("DeferredError: {source}")]
57pub struct DeferredError {
58    #[source]
59    source: BoxDynError,
60}
61
62/// Possible errors that can occur when running a worker.
63#[derive(Error, Debug)]
64pub enum WorkerError {
65    /// An error occurred while consuming the task stream.
66    #[error("Failed to consume task stream: {0}")]
67    StreamError(BoxDynError),
68    /// An error occurred in the worker's heartbeat.
69    #[error("Heartbeat error: {0}")]
70    HeartbeatError(BoxDynError),
71    /// An error occurred while trying to change the state of the worker.
72    #[error("Failed to handle the new state: {0}")]
73    StateError(#[from] WorkerStateError),
74    /// A worker that terminates when .stop was called
75    #[error("Worker stopped and gracefully exited")]
76    GracefulExit,
77    /// A worker panicked and the panic was caught.
78    #[error("Worker panicked: {0}")]
79    PanicError(String),
80    /// An error occurred while handling io
81    #[error("IO error: {0}")]
82    IoError(#[from] std::io::Error),
83}
84
85/// Errors related to worker state transitions
86#[derive(Error, Debug)]
87pub enum WorkerStateError {
88    /// Worker not started
89    #[error("Worker not started, did you forget to call worker.start()")]
90    NotStarted,
91    /// Worker already started
92    #[error("Worker already started")]
93    AlreadyStarted,
94    /// Worker is not running
95    #[error("Worker is not running")]
96    NotRunning,
97    /// Worker is not paused
98    #[error("Worker is not paused")]
99    NotPaused,
100    /// Worker is shutting down
101    #[error("Worker is shutting down")]
102    ShuttingDown,
103    /// Invalid state provided
104    #[error("Worker provided with invalid state {0}")]
105    InvalidState(String),
106}
107
108/// A Tower layer for handling and converting service errors into a `BoxDynError`.
109///
110/// The service's error type must implement `std::error::Error`, allowing for flexible
111/// error handling, especially when dealing with trait objects or complex error chains.
112#[derive(Clone, Debug)]
113pub struct ErrorHandlingLayer {
114    _p: PhantomData<()>,
115}
116
117impl ErrorHandlingLayer {
118    /// Create a new ErrorHandlingLayer
119    #[must_use]
120    pub fn new() -> Self {
121        Self { _p: PhantomData }
122    }
123}
124
125impl Default for ErrorHandlingLayer {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131impl<S> tower_layer::Layer<S> for ErrorHandlingLayer {
132    type Service = ErrorHandlingService<S>;
133
134    fn layer(&self, service: S) -> Self::Service {
135        ErrorHandlingService { service }
136    }
137}
138
139/// The underlying service
140#[derive(Clone, Debug)]
141pub struct ErrorHandlingService<S> {
142    service: S,
143}
144
145impl<S, Request> Service<Request> for ErrorHandlingService<S>
146where
147    S: Service<Request>,
148    S::Error: std::error::Error + Send + Sync + 'static,
149    S::Future: Send + 'static,
150{
151    type Response = S::Response;
152    type Error = BoxDynError;
153    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
154
155    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156        self.service.poll_ready(cx).map_err(|e| {
157            let boxed_error: BoxDynError = e.into();
158            boxed_error
159        })
160    }
161
162    fn call(&mut self, req: Request) -> Self::Future {
163        let fut = self.service.call(req);
164
165        Box::pin(async move {
166            fut.await.map_err(|e| {
167                let boxed_error: BoxDynError = e.into();
168                boxed_error
169            })
170        })
171    }
172}