pulsar/
error.rs

1//! Error types
2use std::sync::{
3    atomic::{AtomicBool, Ordering},
4    Arc, Mutex,
5};
6use std::{fmt, io};
7
8#[derive(Debug)]
9pub enum Error {
10    Connection(ConnectionError),
11    Consumer(ConsumerError),
12    Producer(ProducerError),
13    ServiceDiscovery(ServiceDiscoveryError),
14    Authentication(AuthenticationError),
15    Custom(String),
16    Executor,
17}
18
19impl From<ConnectionError> for Error {
20    fn from(err: ConnectionError) -> Self {
21        Error::Connection(err)
22    }
23}
24
25impl From<ConsumerError> for Error {
26    fn from(err: ConsumerError) -> Self {
27        Error::Consumer(err)
28    }
29}
30
31impl From<ProducerError> for Error {
32    fn from(err: ProducerError) -> Self {
33        Error::Producer(err)
34    }
35}
36
37impl From<ServiceDiscoveryError> for Error {
38    fn from(err: ServiceDiscoveryError) -> Self {
39        Error::ServiceDiscovery(err)
40    }
41}
42
43impl fmt::Display for Error {
44    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
45        match self {
46            Error::Connection(e) => write!(f, "Connection error: {}", e),
47            Error::Consumer(e) => write!(f, "consumer error: {}", e),
48            Error::Producer(e) => write!(f, "producer error: {}", e),
49            Error::ServiceDiscovery(e) => write!(f, "service discovery error: {}", e),
50            Error::Authentication(e) => write!(f, "authentication error: {}", e),
51            Error::Custom(e) => write!(f, "error: {}", e),
52            Error::Executor => write!(f, "could not spawn task"),
53        }
54    }
55}
56
57impl std::error::Error for Error {
58    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
59        match self {
60            Error::Connection(e) => e.source(),
61            Error::Consumer(e) => e.source(),
62            Error::Producer(e) => e.source(),
63            Error::ServiceDiscovery(e) => e.source(),
64            Error::Authentication(e) => e.source(),
65            Error::Custom(_) => None,
66            Error::Executor => None,
67        }
68    }
69}
70
71#[derive(Debug)]
72pub enum ConnectionError {
73    Io(io::Error),
74    Disconnected,
75    PulsarError(Option<crate::message::proto::ServerError>, Option<String>),
76    Unexpected(String),
77    Decoding(String),
78    Encoding(String),
79    SocketAddr(String),
80    UnexpectedResponse(String),
81    Tls(native_tls::Error),
82    Authentication(AuthenticationError),
83    NotFound,
84    Canceled,
85    Shutdown,
86}
87
88impl From<io::Error> for ConnectionError {
89    fn from(err: io::Error) -> Self {
90        ConnectionError::Io(err)
91    }
92}
93
94impl From<native_tls::Error> for ConnectionError {
95    fn from(err: native_tls::Error) -> Self {
96        ConnectionError::Tls(err)
97    }
98}
99
100impl From<AuthenticationError> for ConnectionError {
101    fn from(err: AuthenticationError) -> Self {
102        ConnectionError::Authentication(err)
103    }
104}
105
106impl fmt::Display for ConnectionError {
107    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108        match self {
109            ConnectionError::Io(e) => write!(f, "{}", e),
110            ConnectionError::Disconnected => write!(f, "Disconnected"),
111            ConnectionError::PulsarError(e, s) => {
112                write!(f, "Server error ({:?}): {}", e, s.as_deref().unwrap_or(""))
113            }
114            ConnectionError::Unexpected(e) => write!(f, "{}", e),
115            ConnectionError::Decoding(e) => write!(f, "Error decoding message: {}", e),
116            ConnectionError::Encoding(e) => write!(f, "Error encoding message: {}", e),
117            ConnectionError::SocketAddr(e) => write!(f, "Error obtaining socket address: {}", e),
118            ConnectionError::Tls(e) => write!(f, "Error connecting TLS stream: {}", e),
119            ConnectionError::Authentication(e) => write!(f, "Error authentication: {}", e),
120            ConnectionError::UnexpectedResponse(e) => {
121                write!(f, "Unexpected response from pulsar: {}", e)
122            }
123            ConnectionError::NotFound => write!(f, "error looking up URL"),
124            ConnectionError::Canceled => write!(f, "canceled request"),
125            ConnectionError::Shutdown => write!(f, "The connection was shut down"),
126        }
127    }
128}
129
130impl std::error::Error for ConnectionError {
131    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
132        match self {
133            ConnectionError::Io(e) => Some(e),
134            _ => None,
135        }
136    }
137}
138
139#[derive(Debug)]
140pub enum ConsumerError {
141    Connection(ConnectionError),
142    MissingPayload(String),
143    Io(io::Error),
144    ChannelFull,
145    Closed,
146    BuildError,
147}
148
149impl From<ConnectionError> for ConsumerError {
150    fn from(err: ConnectionError) -> Self {
151        ConsumerError::Connection(err)
152    }
153}
154
155impl From<io::Error> for ConsumerError {
156    fn from(err: io::Error) -> Self {
157        ConsumerError::Io(err)
158    }
159}
160
161impl From<futures::channel::mpsc::SendError> for ConsumerError {
162    fn from(err: futures::channel::mpsc::SendError) -> Self {
163        if err.is_full() {
164            ConsumerError::ChannelFull
165        } else {
166            ConsumerError::Closed
167        }
168    }
169}
170
171impl fmt::Display for ConsumerError {
172    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
173        match self {
174            ConsumerError::Connection(e) => write!(f, "Connection error: {}", e),
175            ConsumerError::MissingPayload(s) => write!(f, "Missing payload: {}", s),
176            ConsumerError::Io(s) => write!(f, "Decompression error: {}", s),
177            ConsumerError::ChannelFull => write!(
178                f,
179                "cannot send message to the consumer engine: the channel is full"
180            ),
181            ConsumerError::Closed => write!(
182                f,
183                "cannot send message to the consumer engine: the channel is closed"
184            ),
185            ConsumerError::BuildError => write!(f, "Error while building the consumer."),
186        }
187    }
188}
189
190impl std::error::Error for ConsumerError {
191    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
192        match self {
193            ConsumerError::Connection(e) => Some(e),
194            _ => None,
195        }
196    }
197}
198
199pub enum ProducerError {
200    Connection(ConnectionError),
201    Custom(String),
202    Io(io::Error),
203    PartialSend(Vec<Result<SendFuture, Error>>),
204    /// Indiciates the error was part of sending a batch, and thus shared across the batch
205    Batch(Arc<Error>),
206}
207
208impl From<ConnectionError> for ProducerError {
209    fn from(err: ConnectionError) -> Self {
210        ProducerError::Connection(err)
211    }
212}
213
214impl From<io::Error> for ProducerError {
215    fn from(err: io::Error) -> Self {
216        ProducerError::Io(err)
217    }
218}
219
220impl fmt::Display for ProducerError {
221    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
222        match self {
223            ProducerError::Connection(e) => write!(f, "Connection error: {}", e),
224            ProducerError::Io(e) => write!(f, "Compression error: {}", e),
225            ProducerError::Custom(s) => write!(f, "Custom error: {}", s),
226            ProducerError::Batch(e) => write!(f, "Batch error: {}", e),
227            ProducerError::PartialSend(e) => {
228                let (successes, failures) = e.iter().fold((0, 0), |(s, f), r| match r {
229                    Ok(_) => (s + 1, f),
230                    Err(_) => (s, f + 1),
231                });
232                write!(
233                    f,
234                    "Partial send error - {} successful, {} failed",
235                    successes, failures
236                )?;
237
238                if failures > 0 {
239                    let first_error = e
240                        .iter()
241                        .find(|r| r.is_err())
242                        .unwrap()
243                        .as_ref()
244                        .map(drop)
245                        .unwrap_err();
246                    write!(f, "first error: {}", first_error)?;
247                }
248                Ok(())
249            }
250        }
251    }
252}
253
254impl fmt::Debug for ProducerError {
255    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256        match self {
257            ProducerError::Connection(e) => write!(f, "Connection({:?})", e),
258            ProducerError::Custom(msg) => write!(f, "Custom({:?})", msg),
259            ProducerError::Io(e) => write!(f, "Connection({:?})", e),
260            ProducerError::Batch(e) => write!(f, "Connection({:?})", e),
261            ProducerError::PartialSend(parts) => {
262                write!(f, "PartialSend(")?;
263                for (i, part) in parts.iter().enumerate() {
264                    match part {
265                        Ok(_) => write!(f, "Ok(SendFuture)")?,
266                        Err(e) => write!(f, "Err({:?})", e)?,
267                    }
268                    if i < (parts.len() - 1) {
269                        write!(f, ", ")?;
270                    }
271                }
272                write!(f, ")")
273            }
274        }
275    }
276}
277
278impl std::error::Error for ProducerError {
279    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
280        match self {
281            ProducerError::Connection(e) => Some(e),
282            ProducerError::Io(e) => Some(e),
283            ProducerError::Batch(e) => Some(e.as_ref()),
284            ProducerError::PartialSend(parts) => parts
285                .iter()
286                .find(|r| r.is_err())
287                .map(|r| r.as_ref().map(drop).unwrap_err() as _),
288            ProducerError::Custom(_) => None,
289        }
290    }
291}
292
293#[derive(Debug)]
294pub enum ServiceDiscoveryError {
295    Connection(ConnectionError),
296    Query(Option<crate::message::proto::ServerError>, Option<String>),
297    NotFound,
298    DnsLookupError,
299    Canceled,
300    Shutdown,
301    Dummy,
302}
303
304impl From<ConnectionError> for ServiceDiscoveryError {
305    fn from(err: ConnectionError) -> Self {
306        ServiceDiscoveryError::Connection(err)
307    }
308}
309
310impl fmt::Display for ServiceDiscoveryError {
311    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
312        match self {
313            ServiceDiscoveryError::Connection(e) => write!(f, "Connection error: {}", e),
314            ServiceDiscoveryError::Query(e, s) => {
315                write!(f, "Query error ({:?}): {}", e, s.as_deref().unwrap_or(""))
316            }
317            ServiceDiscoveryError::NotFound => write!(f, "cannot find topic"),
318            ServiceDiscoveryError::DnsLookupError => write!(f, "cannot lookup broker address"),
319            ServiceDiscoveryError::Canceled => write!(f, "canceled request"),
320            ServiceDiscoveryError::Shutdown => write!(f, "service discovery engine not responding"),
321            ServiceDiscoveryError::Dummy => write!(f, "placeholder error"),
322        }
323    }
324}
325
326impl std::error::Error for ServiceDiscoveryError {
327    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
328        match self {
329            ServiceDiscoveryError::Connection(e) => Some(e),
330            _ => None,
331        }
332    }
333}
334
335#[derive(Debug)]
336pub enum AuthenticationError {
337    Custom(String)
338}
339
340impl fmt::Display for AuthenticationError {
341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342        match self {
343            AuthenticationError::Custom(m) => write!(f, "authentication error [{}]", m)
344        }
345    }
346}
347
348impl std::error::Error for AuthenticationError {
349
350}
351
352#[derive(Clone)]
353pub(crate) struct SharedError {
354    error_set: Arc<AtomicBool>,
355    error: Arc<Mutex<Option<ConnectionError>>>,
356}
357
358impl SharedError {
359    pub fn new() -> SharedError {
360        SharedError {
361            error_set: Arc::new(AtomicBool::new(false)),
362            error: Arc::new(Mutex::new(None)),
363        }
364    }
365
366    pub fn is_set(&self) -> bool {
367        self.error_set.load(Ordering::Relaxed)
368    }
369
370    pub fn remove(&self) -> Option<ConnectionError> {
371        let mut lock = self.error.lock().unwrap();
372        let error = lock.take();
373        self.error_set.store(false, Ordering::Release);
374        error
375    }
376
377    pub fn set(&self, error: ConnectionError) {
378        let mut lock = self.error.lock().unwrap();
379        *lock = Some(error);
380        self.error_set.store(true, Ordering::Release);
381    }
382}
383
384use crate::message::proto::ServerError;
385use crate::producer::SendFuture;
386
387pub(crate) fn server_error(i: i32) -> Option<ServerError> {
388    match i {
389        0 => Some(ServerError::UnknownError),
390        1 => Some(ServerError::MetadataError),
391        2 => Some(ServerError::PersistenceError),
392        3 => Some(ServerError::AuthenticationError),
393        4 => Some(ServerError::AuthorizationError),
394        5 => Some(ServerError::ConsumerBusy),
395        6 => Some(ServerError::ServiceNotReady),
396        7 => Some(ServerError::ProducerBlockedQuotaExceededError),
397        8 => Some(ServerError::ProducerBlockedQuotaExceededException),
398        9 => Some(ServerError::ChecksumError),
399        10 => Some(ServerError::UnsupportedVersionError),
400        11 => Some(ServerError::TopicNotFound),
401        12 => Some(ServerError::SubscriptionNotFound),
402        13 => Some(ServerError::ConsumerNotFound),
403        14 => Some(ServerError::TooManyRequests),
404        15 => Some(ServerError::TopicTerminatedError),
405        16 => Some(ServerError::ProducerBusy),
406        17 => Some(ServerError::InvalidTopicName),
407        /* FIXME: why aren't they found by the compiler? Max enum size?
408        18 => Some(ServerError::IncompatibleSchema),
409        19 => Some(ServerError::ConsumerAssignError),
410        20 => Some(ServerError::TransactionCoordinatorNotFound),
411        21 => Some(ServerError::InvalidTxnStatus),
412        */
413        _ => None,
414    }
415}