Skip to main content

prosa/core/
error.rs

1use std::time::Duration;
2
3use super::{msg::InternalMsg, queue::SendError};
4use prosa_utils::{
5    config::ConfigError,
6    msg::tvf::{Tvf, TvfError},
7};
8
9/// Processor error
10pub trait ProcError: std::error::Error {
11    /// Method to know if the processor can be restart because the error is temporary and can be recover
12    fn recoverable(&self) -> bool;
13    /// Method to know the period between the restart of the processor (restart immediatly by default)
14    fn recovery_duration(&self) -> Duration {
15        Duration::ZERO
16    }
17}
18
19impl<'a, E: ProcError + 'a> From<E> for Box<dyn ProcError + 'a> {
20    fn from(err: E) -> Box<dyn ProcError + 'a> {
21        Box::new(err)
22    }
23}
24
25impl<'a, E: ProcError + Send + Sync + 'a> From<E> for Box<dyn ProcError + Send + Sync + 'a> {
26    fn from(err: E) -> Box<dyn ProcError + Send + Sync + 'a> {
27        Box::new(err)
28    }
29}
30
31/// For a ProSA `ConfigError`, no recovery is possible.
32/// The configuration need to be valid
33impl ProcError for ConfigError {
34    fn recoverable(&self) -> bool {
35        false
36    }
37}
38
39impl ProcError for tokio::task::JoinError {
40    fn recoverable(&self) -> bool {
41        self.is_cancelled()
42    }
43}
44
45impl<M> ProcError for tokio::sync::mpsc::error::SendError<InternalMsg<M>>
46where
47    M: Sized + Clone + Tvf,
48{
49    fn recoverable(&self) -> bool {
50        true
51    }
52}
53
54impl ProcError for std::io::Error {
55    fn recoverable(&self) -> bool {
56        matches!(
57            self.kind(),
58            std::io::ErrorKind::ConnectionReset
59                | std::io::ErrorKind::ConnectionAborted
60                | std::io::ErrorKind::NotConnected
61                | std::io::ErrorKind::BrokenPipe
62                | std::io::ErrorKind::WouldBlock
63                | std::io::ErrorKind::InvalidData
64                | std::io::ErrorKind::TimedOut
65                | std::io::ErrorKind::WriteZero
66                | std::io::ErrorKind::Interrupted
67                | std::io::ErrorKind::UnexpectedEof
68                | std::io::ErrorKind::OutOfMemory
69        )
70    }
71}
72
73#[cfg(feature = "openssl")]
74impl ProcError for openssl::error::Error {
75    fn recoverable(&self) -> bool {
76        if let Some(reason) = self.reason() {
77            // If it's an SSL protocol error, consider that can be recoverable. It's may be temporary related to a distant.
78            reason.contains("SSL_")
79        } else {
80            false
81        }
82    }
83}
84
85#[cfg(feature = "openssl")]
86impl ProcError for openssl::error::ErrorStack {
87    fn recoverable(&self) -> bool {
88        for error in self.errors() {
89            if !error.recoverable() {
90                return false;
91            }
92        }
93
94        true
95    }
96}
97
98/// Error define for ProSA bus error (for message exchange)
99#[derive(Debug, Eq, thiserror::Error, PartialEq)]
100pub enum BusError {
101    /// Error that indicate the queue can't forward the internal main message
102    #[error(
103        "The Queue can't send the internal main message {0}, proc_id={1}, queue_id={2}, reason={3}"
104    )]
105    InternalMainQueue(&'static str, u32, u32, String),
106    /// Error that indicate the queue can't forward the internal message
107    #[error("The Queue can't send the internal message: {0}")]
108    InternalQueue(String),
109    /// Error that indicate the queue can't forward the internal message
110    #[error("The Processor {0}/{1} can't be contacted: {2}")]
111    ProcComm(u32, u32, String),
112    /// Error on the internal TVF message use for internal exchange
113    #[error("The internal message is not correct: {0}")]
114    InternalTvfMsg(#[from] TvfError),
115    /// There is no data to process, it has been consumed
116    #[error("No data to process")]
117    NoData,
118}
119
120impl ProcError for BusError {
121    fn recoverable(&self) -> bool {
122        matches!(self, BusError::InternalMainQueue(..))
123    }
124}
125
126impl<M> From<SendError<InternalMsg<M>>> for BusError
127where
128    M: Sized + Clone + Tvf,
129{
130    fn from(error: SendError<InternalMsg<M>>) -> Self {
131        BusError::InternalQueue(error.to_string())
132    }
133}