apalis_core/
error.rs

1use std::{
2    error::Error as StdError,
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    sync::Arc,
7    task::{Context, Poll},
8};
9use thiserror::Error;
10use tower::Service;
11
12use crate::worker::WorkerError;
13
14/// Convenience type alias
15pub type BoxDynError = Box<dyn StdError + 'static + Send + Sync>;
16
17/// Represents a general error returned by a task or by internals of the platform
18#[derive(Error, Debug, Clone)]
19#[non_exhaustive]
20pub enum Error {
21    /// An error occurred during execution.
22    #[error("FailedError: {0}")]
23    Failed(#[source] Arc<BoxDynError>),
24
25    /// Execution was aborted
26    #[error("AbortError: {0}")]
27    Abort(#[source] Arc<BoxDynError>),
28
29    #[doc(hidden)]
30    /// Encountered an error during worker execution
31    /// This should not be used inside a task function
32    #[error("WorkerError: {0}")]
33    WorkerError(WorkerError),
34
35    /// Missing some data and yet it was requested during execution.
36    /// This should not be used inside a task function
37    #[error("MissingDataError: {0}")]
38    MissingData(String),
39
40    #[doc(hidden)]
41    /// Encountered an error during service execution
42    /// This should not be used inside a task function
43    #[error("Encountered an error during service execution")]
44    ServiceError(#[source] Arc<BoxDynError>),
45
46    #[doc(hidden)]
47    /// Encountered an error during service execution
48    /// This should not be used inside a task function
49    #[error("Encountered an error during streaming")]
50    SourceError(#[source] Arc<BoxDynError>),
51}
52
53impl From<BoxDynError> for Error {
54    fn from(err: BoxDynError) -> Self {
55        if let Some(e) = err.downcast_ref::<Error>() {
56            e.clone()
57        } else {
58            Error::Failed(Arc::new(err))
59        }
60    }
61}
62
63/// A Tower layer for handling and converting service errors into a custom `Error` type.
64///
65/// This layer wraps a service and intercepts any errors returned by the service.
66/// It attempts to downcast the error into the custom `Error` enum. If the downcast
67/// succeeds, it returns the downcasted `Error`. If the downcast fails, the original
68/// error is wrapped in `Error::Failed`.
69///
70/// The service's error type must implement `Into<BoxDynError>`, allowing for flexible
71/// error handling, especially when dealing with trait objects or complex error chains.
72#[derive(Clone, Debug)]
73pub struct ErrorHandlingLayer {
74    _p: PhantomData<()>,
75}
76
77impl ErrorHandlingLayer {
78    /// Create a new ErrorHandlingLayer
79    pub fn new() -> Self {
80        Self { _p: PhantomData }
81    }
82}
83
84impl Default for ErrorHandlingLayer {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90impl<S> tower::layer::Layer<S> for ErrorHandlingLayer {
91    type Service = ErrorHandlingService<S>;
92
93    fn layer(&self, service: S) -> Self::Service {
94        ErrorHandlingService { service }
95    }
96}
97
98/// The underlying service
99#[derive(Clone, Debug)]
100pub struct ErrorHandlingService<S> {
101    service: S,
102}
103
104impl<S, Request> Service<Request> for ErrorHandlingService<S>
105where
106    S: Service<Request>,
107    S::Error: Into<BoxDynError>,
108    S::Future: Send + 'static,
109{
110    type Response = S::Response;
111    type Error = Error;
112    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
113
114    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115        self.service.poll_ready(cx).map_err(|e| {
116            let boxed_error: BoxDynError = e.into();
117            boxed_error.into()
118        })
119    }
120
121    fn call(&mut self, req: Request) -> Self::Future {
122        let fut = self.service.call(req);
123
124        Box::pin(async move {
125            fut.await.map_err(|e| {
126                let boxed_error: BoxDynError = e.into();
127                boxed_error.into()
128            })
129        })
130    }
131}