1use std::time::Duration;
2
3use super::{msg::InternalMsg, queue::SendError};
4use prosa_utils::{
5 config::ConfigError,
6 msg::tvf::{Tvf, TvfError},
7};
8
9pub trait ProcError: std::error::Error {
11 fn recoverable(&self) -> bool;
13 fn recovery_duration(&self) -> Duration {
15 Duration::ZERO
16 }
17}
18
19impl<'a, E: ProcError + 'a> From<E> for Box<dyn ProcError + 'a> {
20 fn from(err: E) -> Box<dyn ProcError + 'a> {
21 Box::new(err)
22 }
23}
24
25impl<'a, E: ProcError + Send + Sync + 'a> From<E> for Box<dyn ProcError + Send + Sync + 'a> {
26 fn from(err: E) -> Box<dyn ProcError + Send + Sync + 'a> {
27 Box::new(err)
28 }
29}
30
31impl ProcError for ConfigError {
34 fn recoverable(&self) -> bool {
35 false
36 }
37}
38
39impl ProcError for tokio::task::JoinError {
40 fn recoverable(&self) -> bool {
41 self.is_cancelled()
42 }
43}
44
45impl<M> ProcError for tokio::sync::mpsc::error::SendError<InternalMsg<M>>
46where
47 M: Sized + Clone + Tvf,
48{
49 fn recoverable(&self) -> bool {
50 true
51 }
52}
53
54impl ProcError for std::io::Error {
55 fn recoverable(&self) -> bool {
56 matches!(
57 self.kind(),
58 std::io::ErrorKind::ConnectionReset
59 | std::io::ErrorKind::ConnectionAborted
60 | std::io::ErrorKind::NotConnected
61 | std::io::ErrorKind::BrokenPipe
62 | std::io::ErrorKind::WouldBlock
63 | std::io::ErrorKind::InvalidData
64 | std::io::ErrorKind::TimedOut
65 | std::io::ErrorKind::WriteZero
66 | std::io::ErrorKind::Interrupted
67 | std::io::ErrorKind::UnexpectedEof
68 | std::io::ErrorKind::OutOfMemory
69 )
70 }
71}
72
73#[cfg(feature = "openssl")]
74impl ProcError for openssl::error::Error {
75 fn recoverable(&self) -> bool {
76 if let Some(reason) = self.reason() {
77 reason.contains("SSL_")
79 } else {
80 false
81 }
82 }
83}
84
85#[cfg(feature = "openssl")]
86impl ProcError for openssl::error::ErrorStack {
87 fn recoverable(&self) -> bool {
88 for error in self.errors() {
89 if !error.recoverable() {
90 return false;
91 }
92 }
93
94 true
95 }
96}
97
98#[derive(Debug, Eq, thiserror::Error, PartialEq)]
100pub enum BusError {
101 #[error(
103 "The Queue can't send the internal main message {0}, proc_id={1}, queue_id={2}, reason={3}"
104 )]
105 InternalMainQueue(&'static str, u32, u32, String),
106 #[error("The Queue can't send the internal message: {0}")]
108 InternalQueue(String),
109 #[error("The Processor {0}/{1} can't be contacted: {2}")]
111 ProcComm(u32, u32, String),
112 #[error("The internal message is not correct: {0}")]
114 InternalTvfMsg(#[from] TvfError),
115 #[error("No data to process")]
117 NoData,
118}
119
120impl ProcError for BusError {
121 fn recoverable(&self) -> bool {
122 matches!(self, BusError::InternalMainQueue(..))
123 }
124}
125
126impl<M> From<SendError<InternalMsg<M>>> for BusError
127where
128 M: Sized + Clone + Tvf,
129{
130 fn from(error: SendError<InternalMsg<M>>) -> Self {
131 BusError::InternalQueue(error.to_string())
132 }
133}