metaplex_pulsar/
error.rs

1//! Error types
2use std::sync::{
3    atomic::{AtomicBool, Ordering},
4    Arc, Mutex,
5};
6use std::{fmt, io};
7
8#[derive(Debug)]
9pub enum Error {
10    Connection(ConnectionError),
11    Consumer(ConsumerError),
12    Producer(ProducerError),
13    ServiceDiscovery(ServiceDiscoveryError),
14    Authentication(AuthenticationError),
15    Custom(String),
16    Executor,
17}
18
19impl From<ConnectionError> for Error {
20    fn from(err: ConnectionError) -> Self {
21        Error::Connection(err)
22    }
23}
24
25impl From<ConsumerError> for Error {
26    fn from(err: ConsumerError) -> Self {
27        Error::Consumer(err)
28    }
29}
30
31impl From<ProducerError> for Error {
32    fn from(err: ProducerError) -> Self {
33        Error::Producer(err)
34    }
35}
36
37impl From<ServiceDiscoveryError> for Error {
38    fn from(err: ServiceDiscoveryError) -> Self {
39        Error::ServiceDiscovery(err)
40    }
41}
42
43impl fmt::Display for Error {
44    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
45        match self {
46            Error::Connection(e) => write!(f, "Connection error: {}", e),
47            Error::Consumer(e) => write!(f, "consumer error: {}", e),
48            Error::Producer(e) => write!(f, "producer error: {}", e),
49            Error::ServiceDiscovery(e) => write!(f, "service discovery error: {}", e),
50            Error::Authentication(e) => write!(f, "authentication error: {}", e),
51            Error::Custom(e) => write!(f, "error: {}", e),
52            Error::Executor => write!(f, "could not spawn task"),
53        }
54    }
55}
56
57impl std::error::Error for Error {
58    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
59        match self {
60            Error::Connection(e) => e.source(),
61            Error::Consumer(e) => e.source(),
62            Error::Producer(e) => e.source(),
63            Error::ServiceDiscovery(e) => e.source(),
64            Error::Authentication(e) => e.source(),
65            Error::Custom(_) => None,
66            Error::Executor => None,
67        }
68    }
69}
70
71#[derive(Debug)]
72pub enum ConnectionError {
73    Io(io::Error),
74    Disconnected,
75    PulsarError(Option<crate::message::proto::ServerError>, Option<String>),
76    Unexpected(String),
77    Decoding(String),
78    Encoding(String),
79    SocketAddr(String),
80    UnexpectedResponse(String),
81    Tls(native_tls::Error),
82    Authentication(AuthenticationError),
83    NotFound,
84    Canceled,
85    Shutdown,
86}
87
88impl From<io::Error> for ConnectionError {
89    fn from(err: io::Error) -> Self {
90        ConnectionError::Io(err)
91    }
92}
93
94impl From<native_tls::Error> for ConnectionError {
95    fn from(err: native_tls::Error) -> Self {
96        ConnectionError::Tls(err)
97    }
98}
99
100impl From<AuthenticationError> for ConnectionError {
101    fn from(err: AuthenticationError) -> Self {
102        ConnectionError::Authentication(err)
103    }
104}
105
106impl fmt::Display for ConnectionError {
107    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108        match self {
109            ConnectionError::Io(e) => write!(f, "{}", e),
110            ConnectionError::Disconnected => write!(f, "Disconnected"),
111            ConnectionError::PulsarError(e, s) => {
112                write!(f, "Server error ({:?}): {}", e, s.as_deref().unwrap_or(""))
113            }
114            ConnectionError::Unexpected(e) => write!(f, "{}", e),
115            ConnectionError::Decoding(e) => write!(f, "Error decoding message: {}", e),
116            ConnectionError::Encoding(e) => write!(f, "Error encoding message: {}", e),
117            ConnectionError::SocketAddr(e) => write!(f, "Error obtaining socket address: {}", e),
118            ConnectionError::Tls(e) => write!(f, "Error connecting TLS stream: {}", e),
119            ConnectionError::Authentication(e) => write!(f, "Error authentication: {}", e),
120            ConnectionError::UnexpectedResponse(e) => {
121                write!(f, "Unexpected response from pulsar: {}", e)
122            }
123            ConnectionError::NotFound => write!(f, "error looking up URL"),
124            ConnectionError::Canceled => write!(f, "canceled request"),
125            ConnectionError::Shutdown => write!(f, "The connection was shut down"),
126        }
127    }
128}
129
130impl std::error::Error for ConnectionError {
131    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
132        match self {
133            ConnectionError::Io(e) => Some(e),
134            _ => None,
135        }
136    }
137}
138
139#[derive(Debug)]
140pub enum ConsumerError {
141    Connection(ConnectionError),
142    MissingPayload(String),
143    Io(io::Error),
144    ChannelFull,
145    Closed,
146    BuildError,
147}
148
149impl From<ConnectionError> for ConsumerError {
150    fn from(err: ConnectionError) -> Self {
151        ConsumerError::Connection(err)
152    }
153}
154
155impl From<io::Error> for ConsumerError {
156    fn from(err: io::Error) -> Self {
157        ConsumerError::Io(err)
158    }
159}
160
161impl From<futures::channel::mpsc::SendError> for ConsumerError {
162    fn from(err: futures::channel::mpsc::SendError) -> Self {
163        if err.is_full() {
164            ConsumerError::ChannelFull
165        } else {
166            ConsumerError::Closed
167        }
168    }
169}
170
171impl fmt::Display for ConsumerError {
172    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
173        match self {
174            ConsumerError::Connection(e) => write!(f, "Connection error: {}", e),
175            ConsumerError::MissingPayload(s) => write!(f, "Missing payload: {}", s),
176            ConsumerError::Io(s) => write!(f, "Decompression error: {}", s),
177            ConsumerError::ChannelFull => write!(
178                f,
179                "cannot send message to the consumer engine: the channel is full"
180            ),
181            ConsumerError::Closed => write!(
182                f,
183                "cannot send message to the consumer engine: the channel is closed"
184            ),
185            ConsumerError::BuildError => write!(f, "Error while building the consumer."),
186        }
187    }
188}
189
190impl std::error::Error for ConsumerError {
191    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
192        match self {
193            ConsumerError::Connection(e) => Some(e),
194            _ => None,
195        }
196    }
197}
198
199pub enum ProducerError {
200    Connection(ConnectionError),
201    Custom(String),
202    Io(io::Error),
203    PartialSend(Vec<Result<SendFuture, Error>>),
204    /// Indiciates the error was part of sending a batch, and thus shared across the batch
205    Batch(Arc<Error>),
206    /// Indicates this producer has lost exclusive access to the topic. Client can decided whether to recreate or not
207    Fenced,
208}
209
210impl From<ConnectionError> for ProducerError {
211    fn from(err: ConnectionError) -> Self {
212        ProducerError::Connection(err)
213    }
214}
215
216impl From<io::Error> for ProducerError {
217    fn from(err: io::Error) -> Self {
218        ProducerError::Io(err)
219    }
220}
221
222impl fmt::Display for ProducerError {
223    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
224        match self {
225            ProducerError::Connection(e) => write!(f, "Connection error: {}", e),
226            ProducerError::Io(e) => write!(f, "Compression error: {}", e),
227            ProducerError::Custom(s) => write!(f, "Custom error: {}", s),
228            ProducerError::Batch(e) => write!(f, "Batch error: {}", e),
229            ProducerError::PartialSend(e) => {
230                let (successes, failures) = e.iter().fold((0, 0), |(s, f), r| match r {
231                    Ok(_) => (s + 1, f),
232                    Err(_) => (s, f + 1),
233                });
234                write!(
235                    f,
236                    "Partial send error - {} successful, {} failed",
237                    successes, failures
238                )?;
239
240                if failures > 0 {
241                    let first_error = e
242                        .iter()
243                        .find(|r| r.is_err())
244                        .unwrap()
245                        .as_ref()
246                        .map(drop)
247                        .unwrap_err();
248                    write!(f, "first error: {}", first_error)?;
249                }
250                Ok(())
251            }
252            ProducerError::Fenced => write!(f, "Producer is fenced"),
253        }
254    }
255}
256
257impl fmt::Debug for ProducerError {
258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259        match self {
260            ProducerError::Connection(e) => write!(f, "Connection({:?})", e),
261            ProducerError::Custom(msg) => write!(f, "Custom({:?})", msg),
262            ProducerError::Io(e) => write!(f, "Connection({:?})", e),
263            ProducerError::Batch(e) => write!(f, "Connection({:?})", e),
264            ProducerError::PartialSend(parts) => {
265                write!(f, "PartialSend(")?;
266                for (i, part) in parts.iter().enumerate() {
267                    match part {
268                        Ok(_) => write!(f, "Ok(SendFuture)")?,
269                        Err(e) => write!(f, "Err({:?})", e)?,
270                    }
271                    if i < (parts.len() - 1) {
272                        write!(f, ", ")?;
273                    }
274                }
275                write!(f, ")")
276            }, 
277            ProducerError::Fenced => write!(f, "Producer is fenced"),
278        }
279    }
280}
281
282impl std::error::Error for ProducerError {
283    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
284        match self {
285            ProducerError::Connection(e) => Some(e),
286            ProducerError::Io(e) => Some(e),
287            ProducerError::Batch(e) => Some(e.as_ref()),
288            ProducerError::PartialSend(parts) => parts
289                .iter()
290                .find(|r| r.is_err())
291                .map(|r| r.as_ref().map(drop).unwrap_err() as _),
292            ProducerError::Custom(_) => None,
293            ProducerError::Fenced => None,
294        }
295    }
296}
297
298#[derive(Debug)]
299pub enum ServiceDiscoveryError {
300    Connection(ConnectionError),
301    Query(Option<crate::message::proto::ServerError>, Option<String>),
302    NotFound,
303    DnsLookupError,
304    Canceled,
305    Shutdown,
306    Dummy,
307}
308
309impl From<ConnectionError> for ServiceDiscoveryError {
310    fn from(err: ConnectionError) -> Self {
311        ServiceDiscoveryError::Connection(err)
312    }
313}
314
315impl fmt::Display for ServiceDiscoveryError {
316    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
317        match self {
318            ServiceDiscoveryError::Connection(e) => write!(f, "Connection error: {}", e),
319            ServiceDiscoveryError::Query(e, s) => {
320                write!(f, "Query error ({:?}): {}", e, s.as_deref().unwrap_or(""))
321            }
322            ServiceDiscoveryError::NotFound => write!(f, "cannot find topic"),
323            ServiceDiscoveryError::DnsLookupError => write!(f, "cannot lookup broker address"),
324            ServiceDiscoveryError::Canceled => write!(f, "canceled request"),
325            ServiceDiscoveryError::Shutdown => write!(f, "service discovery engine not responding"),
326            ServiceDiscoveryError::Dummy => write!(f, "placeholder error"),
327        }
328    }
329}
330
331impl std::error::Error for ServiceDiscoveryError {
332    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
333        match self {
334            ServiceDiscoveryError::Connection(e) => Some(e),
335            _ => None,
336        }
337    }
338}
339
340#[derive(Debug)]
341pub enum AuthenticationError {
342    Custom(String),
343}
344
345impl fmt::Display for AuthenticationError {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        match self {
348            AuthenticationError::Custom(m) => write!(f, "authentication error [{}]", m),
349        }
350    }
351}
352
353impl std::error::Error for AuthenticationError {}
354
355#[derive(Clone)]
356pub(crate) struct SharedError {
357    error_set: Arc<AtomicBool>,
358    error: Arc<Mutex<Option<ConnectionError>>>,
359}
360
361impl SharedError {
362    pub fn new() -> SharedError {
363        SharedError {
364            error_set: Arc::new(AtomicBool::new(false)),
365            error: Arc::new(Mutex::new(None)),
366        }
367    }
368
369    pub fn is_set(&self) -> bool {
370        self.error_set.load(Ordering::Relaxed)
371    }
372
373    pub fn remove(&self) -> Option<ConnectionError> {
374        let mut lock = self.error.lock().unwrap();
375        let error = lock.take();
376        self.error_set.store(false, Ordering::Release);
377        error
378    }
379
380    pub fn set(&self, error: ConnectionError) {
381        let mut lock = self.error.lock().unwrap();
382        *lock = Some(error);
383        self.error_set.store(true, Ordering::Release);
384    }
385}
386
387use crate::message::proto::ServerError;
388use crate::producer::SendFuture;
389
390pub(crate) fn server_error(i: i32) -> Option<ServerError> {
391    match i {
392        0 => Some(ServerError::UnknownError),
393        1 => Some(ServerError::MetadataError),
394        2 => Some(ServerError::PersistenceError),
395        3 => Some(ServerError::AuthenticationError),
396        4 => Some(ServerError::AuthorizationError),
397        5 => Some(ServerError::ConsumerBusy),
398        6 => Some(ServerError::ServiceNotReady),
399        7 => Some(ServerError::ProducerBlockedQuotaExceededError),
400        8 => Some(ServerError::ProducerBlockedQuotaExceededException),
401        9 => Some(ServerError::ChecksumError),
402        10 => Some(ServerError::UnsupportedVersionError),
403        11 => Some(ServerError::TopicNotFound),
404        12 => Some(ServerError::SubscriptionNotFound),
405        13 => Some(ServerError::ConsumerNotFound),
406        14 => Some(ServerError::TooManyRequests),
407        15 => Some(ServerError::TopicTerminatedError),
408        16 => Some(ServerError::ProducerBusy),
409        17 => Some(ServerError::InvalidTopicName),
410        18 => Some(ServerError::IncompatibleSchema),
411        19 => Some(ServerError::ConsumerAssignError),
412        20 => Some(ServerError::TransactionCoordinatorNotFound),
413        21 => Some(ServerError::InvalidTxnStatus),
414        22 => Some(ServerError::NotAllowedError),
415        23 => Some(ServerError::TransactionConflict),
416        24 => Some(ServerError::TransactionNotFound),
417        25 => Some(ServerError::ProducerFenced),
418        _ => None,
419    }
420}