1use std::{
2 error::Error as StdError,
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8};
9use thiserror::Error;
10use tower::Service;
11
12use crate::worker::WorkerError;
13
14pub type BoxDynError = Box<dyn StdError + 'static + Send + Sync>;
16
17#[derive(Error, Debug, Clone)]
19#[non_exhaustive]
20pub enum Error {
21 #[error("FailedError: {0}")]
23 Failed(#[source] Arc<BoxDynError>),
24
25 #[error("AbortError: {0}")]
27 Abort(#[source] Arc<BoxDynError>),
28
29 #[doc(hidden)]
30 #[error("WorkerError: {0}")]
33 WorkerError(WorkerError),
34
35 #[error("MissingDataError: {0}")]
38 MissingData(String),
39
40 #[doc(hidden)]
41 #[error("Encountered an error during service execution")]
44 ServiceError(#[source] Arc<BoxDynError>),
45
46 #[doc(hidden)]
47 #[error("Encountered an error during streaming")]
50 SourceError(#[source] Arc<BoxDynError>),
51}
52
53impl From<BoxDynError> for Error {
54 fn from(err: BoxDynError) -> Self {
55 if let Some(e) = err.downcast_ref::<Error>() {
56 e.clone()
57 } else {
58 Error::Failed(Arc::new(err))
59 }
60 }
61}
62
63#[derive(Clone, Debug)]
73pub struct ErrorHandlingLayer {
74 _p: PhantomData<()>,
75}
76
77impl ErrorHandlingLayer {
78 pub fn new() -> Self {
80 Self { _p: PhantomData }
81 }
82}
83
84impl Default for ErrorHandlingLayer {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90impl<S> tower::layer::Layer<S> for ErrorHandlingLayer {
91 type Service = ErrorHandlingService<S>;
92
93 fn layer(&self, service: S) -> Self::Service {
94 ErrorHandlingService { service }
95 }
96}
97
98#[derive(Clone, Debug)]
100pub struct ErrorHandlingService<S> {
101 service: S,
102}
103
104impl<S, Request> Service<Request> for ErrorHandlingService<S>
105where
106 S: Service<Request>,
107 S::Error: Into<BoxDynError>,
108 S::Future: Send + 'static,
109{
110 type Response = S::Response;
111 type Error = Error;
112 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
113
114 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115 self.service.poll_ready(cx).map_err(|e| {
116 let boxed_error: BoxDynError = e.into();
117 boxed_error.into()
118 })
119 }
120
121 fn call(&mut self, req: Request) -> Self::Future {
122 let fut = self.service.call(req);
123
124 Box::pin(async move {
125 fut.await.map_err(|e| {
126 let boxed_error: BoxDynError = e.into();
127 boxed_error.into()
128 })
129 })
130 }
131}