1use std::sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc, Mutex,
5};
6use std::{fmt, io};
7
8#[derive(Debug)]
9pub enum Error {
10 Connection(ConnectionError),
11 Consumer(ConsumerError),
12 Producer(ProducerError),
13 ServiceDiscovery(ServiceDiscoveryError),
14 Authentication(AuthenticationError),
15 Custom(String),
16 Executor,
17}
18
19impl From<ConnectionError> for Error {
20 fn from(err: ConnectionError) -> Self {
21 Error::Connection(err)
22 }
23}
24
25impl From<ConsumerError> for Error {
26 fn from(err: ConsumerError) -> Self {
27 Error::Consumer(err)
28 }
29}
30
31impl From<ProducerError> for Error {
32 fn from(err: ProducerError) -> Self {
33 Error::Producer(err)
34 }
35}
36
37impl From<ServiceDiscoveryError> for Error {
38 fn from(err: ServiceDiscoveryError) -> Self {
39 Error::ServiceDiscovery(err)
40 }
41}
42
43impl fmt::Display for Error {
44 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
45 match self {
46 Error::Connection(e) => write!(f, "Connection error: {}", e),
47 Error::Consumer(e) => write!(f, "consumer error: {}", e),
48 Error::Producer(e) => write!(f, "producer error: {}", e),
49 Error::ServiceDiscovery(e) => write!(f, "service discovery error: {}", e),
50 Error::Authentication(e) => write!(f, "authentication error: {}", e),
51 Error::Custom(e) => write!(f, "error: {}", e),
52 Error::Executor => write!(f, "could not spawn task"),
53 }
54 }
55}
56
57impl std::error::Error for Error {
58 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
59 match self {
60 Error::Connection(e) => e.source(),
61 Error::Consumer(e) => e.source(),
62 Error::Producer(e) => e.source(),
63 Error::ServiceDiscovery(e) => e.source(),
64 Error::Authentication(e) => e.source(),
65 Error::Custom(_) => None,
66 Error::Executor => None,
67 }
68 }
69}
70
71#[derive(Debug)]
72pub enum ConnectionError {
73 Io(io::Error),
74 Disconnected,
75 PulsarError(Option<crate::message::proto::ServerError>, Option<String>),
76 Unexpected(String),
77 Decoding(String),
78 Encoding(String),
79 SocketAddr(String),
80 UnexpectedResponse(String),
81 Tls(native_tls::Error),
82 Authentication(AuthenticationError),
83 NotFound,
84 Canceled,
85 Shutdown,
86}
87
88impl From<io::Error> for ConnectionError {
89 fn from(err: io::Error) -> Self {
90 ConnectionError::Io(err)
91 }
92}
93
94impl From<native_tls::Error> for ConnectionError {
95 fn from(err: native_tls::Error) -> Self {
96 ConnectionError::Tls(err)
97 }
98}
99
100impl From<AuthenticationError> for ConnectionError {
101 fn from(err: AuthenticationError) -> Self {
102 ConnectionError::Authentication(err)
103 }
104}
105
106impl fmt::Display for ConnectionError {
107 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108 match self {
109 ConnectionError::Io(e) => write!(f, "{}", e),
110 ConnectionError::Disconnected => write!(f, "Disconnected"),
111 ConnectionError::PulsarError(e, s) => {
112 write!(f, "Server error ({:?}): {}", e, s.as_deref().unwrap_or(""))
113 }
114 ConnectionError::Unexpected(e) => write!(f, "{}", e),
115 ConnectionError::Decoding(e) => write!(f, "Error decoding message: {}", e),
116 ConnectionError::Encoding(e) => write!(f, "Error encoding message: {}", e),
117 ConnectionError::SocketAddr(e) => write!(f, "Error obtaining socket address: {}", e),
118 ConnectionError::Tls(e) => write!(f, "Error connecting TLS stream: {}", e),
119 ConnectionError::Authentication(e) => write!(f, "Error authentication: {}", e),
120 ConnectionError::UnexpectedResponse(e) => {
121 write!(f, "Unexpected response from pulsar: {}", e)
122 }
123 ConnectionError::NotFound => write!(f, "error looking up URL"),
124 ConnectionError::Canceled => write!(f, "canceled request"),
125 ConnectionError::Shutdown => write!(f, "The connection was shut down"),
126 }
127 }
128}
129
130impl std::error::Error for ConnectionError {
131 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
132 match self {
133 ConnectionError::Io(e) => Some(e),
134 _ => None,
135 }
136 }
137}
138
139#[derive(Debug)]
140pub enum ConsumerError {
141 Connection(ConnectionError),
142 MissingPayload(String),
143 Io(io::Error),
144 ChannelFull,
145 Closed,
146 BuildError,
147}
148
149impl From<ConnectionError> for ConsumerError {
150 fn from(err: ConnectionError) -> Self {
151 ConsumerError::Connection(err)
152 }
153}
154
155impl From<io::Error> for ConsumerError {
156 fn from(err: io::Error) -> Self {
157 ConsumerError::Io(err)
158 }
159}
160
161impl From<futures::channel::mpsc::SendError> for ConsumerError {
162 fn from(err: futures::channel::mpsc::SendError) -> Self {
163 if err.is_full() {
164 ConsumerError::ChannelFull
165 } else {
166 ConsumerError::Closed
167 }
168 }
169}
170
171impl fmt::Display for ConsumerError {
172 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
173 match self {
174 ConsumerError::Connection(e) => write!(f, "Connection error: {}", e),
175 ConsumerError::MissingPayload(s) => write!(f, "Missing payload: {}", s),
176 ConsumerError::Io(s) => write!(f, "Decompression error: {}", s),
177 ConsumerError::ChannelFull => write!(
178 f,
179 "cannot send message to the consumer engine: the channel is full"
180 ),
181 ConsumerError::Closed => write!(
182 f,
183 "cannot send message to the consumer engine: the channel is closed"
184 ),
185 ConsumerError::BuildError => write!(f, "Error while building the consumer."),
186 }
187 }
188}
189
190impl std::error::Error for ConsumerError {
191 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
192 match self {
193 ConsumerError::Connection(e) => Some(e),
194 _ => None,
195 }
196 }
197}
198
199pub enum ProducerError {
200 Connection(ConnectionError),
201 Custom(String),
202 Io(io::Error),
203 PartialSend(Vec<Result<SendFuture, Error>>),
204 Batch(Arc<Error>),
206 Fenced,
208}
209
210impl From<ConnectionError> for ProducerError {
211 fn from(err: ConnectionError) -> Self {
212 ProducerError::Connection(err)
213 }
214}
215
216impl From<io::Error> for ProducerError {
217 fn from(err: io::Error) -> Self {
218 ProducerError::Io(err)
219 }
220}
221
222impl fmt::Display for ProducerError {
223 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
224 match self {
225 ProducerError::Connection(e) => write!(f, "Connection error: {}", e),
226 ProducerError::Io(e) => write!(f, "Compression error: {}", e),
227 ProducerError::Custom(s) => write!(f, "Custom error: {}", s),
228 ProducerError::Batch(e) => write!(f, "Batch error: {}", e),
229 ProducerError::PartialSend(e) => {
230 let (successes, failures) = e.iter().fold((0, 0), |(s, f), r| match r {
231 Ok(_) => (s + 1, f),
232 Err(_) => (s, f + 1),
233 });
234 write!(
235 f,
236 "Partial send error - {} successful, {} failed",
237 successes, failures
238 )?;
239
240 if failures > 0 {
241 let first_error = e
242 .iter()
243 .find(|r| r.is_err())
244 .unwrap()
245 .as_ref()
246 .map(drop)
247 .unwrap_err();
248 write!(f, "first error: {}", first_error)?;
249 }
250 Ok(())
251 }
252 ProducerError::Fenced => write!(f, "Producer is fenced"),
253 }
254 }
255}
256
257impl fmt::Debug for ProducerError {
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 match self {
260 ProducerError::Connection(e) => write!(f, "Connection({:?})", e),
261 ProducerError::Custom(msg) => write!(f, "Custom({:?})", msg),
262 ProducerError::Io(e) => write!(f, "Connection({:?})", e),
263 ProducerError::Batch(e) => write!(f, "Connection({:?})", e),
264 ProducerError::PartialSend(parts) => {
265 write!(f, "PartialSend(")?;
266 for (i, part) in parts.iter().enumerate() {
267 match part {
268 Ok(_) => write!(f, "Ok(SendFuture)")?,
269 Err(e) => write!(f, "Err({:?})", e)?,
270 }
271 if i < (parts.len() - 1) {
272 write!(f, ", ")?;
273 }
274 }
275 write!(f, ")")
276 },
277 ProducerError::Fenced => write!(f, "Producer is fenced"),
278 }
279 }
280}
281
282impl std::error::Error for ProducerError {
283 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
284 match self {
285 ProducerError::Connection(e) => Some(e),
286 ProducerError::Io(e) => Some(e),
287 ProducerError::Batch(e) => Some(e.as_ref()),
288 ProducerError::PartialSend(parts) => parts
289 .iter()
290 .find(|r| r.is_err())
291 .map(|r| r.as_ref().map(drop).unwrap_err() as _),
292 ProducerError::Custom(_) => None,
293 ProducerError::Fenced => None,
294 }
295 }
296}
297
298#[derive(Debug)]
299pub enum ServiceDiscoveryError {
300 Connection(ConnectionError),
301 Query(Option<crate::message::proto::ServerError>, Option<String>),
302 NotFound,
303 DnsLookupError,
304 Canceled,
305 Shutdown,
306 Dummy,
307}
308
309impl From<ConnectionError> for ServiceDiscoveryError {
310 fn from(err: ConnectionError) -> Self {
311 ServiceDiscoveryError::Connection(err)
312 }
313}
314
315impl fmt::Display for ServiceDiscoveryError {
316 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
317 match self {
318 ServiceDiscoveryError::Connection(e) => write!(f, "Connection error: {}", e),
319 ServiceDiscoveryError::Query(e, s) => {
320 write!(f, "Query error ({:?}): {}", e, s.as_deref().unwrap_or(""))
321 }
322 ServiceDiscoveryError::NotFound => write!(f, "cannot find topic"),
323 ServiceDiscoveryError::DnsLookupError => write!(f, "cannot lookup broker address"),
324 ServiceDiscoveryError::Canceled => write!(f, "canceled request"),
325 ServiceDiscoveryError::Shutdown => write!(f, "service discovery engine not responding"),
326 ServiceDiscoveryError::Dummy => write!(f, "placeholder error"),
327 }
328 }
329}
330
331impl std::error::Error for ServiceDiscoveryError {
332 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
333 match self {
334 ServiceDiscoveryError::Connection(e) => Some(e),
335 _ => None,
336 }
337 }
338}
339
340#[derive(Debug)]
341pub enum AuthenticationError {
342 Custom(String),
343}
344
345impl fmt::Display for AuthenticationError {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 match self {
348 AuthenticationError::Custom(m) => write!(f, "authentication error [{}]", m),
349 }
350 }
351}
352
353impl std::error::Error for AuthenticationError {}
354
355#[derive(Clone)]
356pub(crate) struct SharedError {
357 error_set: Arc<AtomicBool>,
358 error: Arc<Mutex<Option<ConnectionError>>>,
359}
360
361impl SharedError {
362 pub fn new() -> SharedError {
363 SharedError {
364 error_set: Arc::new(AtomicBool::new(false)),
365 error: Arc::new(Mutex::new(None)),
366 }
367 }
368
369 pub fn is_set(&self) -> bool {
370 self.error_set.load(Ordering::Relaxed)
371 }
372
373 pub fn remove(&self) -> Option<ConnectionError> {
374 let mut lock = self.error.lock().unwrap();
375 let error = lock.take();
376 self.error_set.store(false, Ordering::Release);
377 error
378 }
379
380 pub fn set(&self, error: ConnectionError) {
381 let mut lock = self.error.lock().unwrap();
382 *lock = Some(error);
383 self.error_set.store(true, Ordering::Release);
384 }
385}
386
387use crate::message::proto::ServerError;
388use crate::producer::SendFuture;
389
390pub(crate) fn server_error(i: i32) -> Option<ServerError> {
391 match i {
392 0 => Some(ServerError::UnknownError),
393 1 => Some(ServerError::MetadataError),
394 2 => Some(ServerError::PersistenceError),
395 3 => Some(ServerError::AuthenticationError),
396 4 => Some(ServerError::AuthorizationError),
397 5 => Some(ServerError::ConsumerBusy),
398 6 => Some(ServerError::ServiceNotReady),
399 7 => Some(ServerError::ProducerBlockedQuotaExceededError),
400 8 => Some(ServerError::ProducerBlockedQuotaExceededException),
401 9 => Some(ServerError::ChecksumError),
402 10 => Some(ServerError::UnsupportedVersionError),
403 11 => Some(ServerError::TopicNotFound),
404 12 => Some(ServerError::SubscriptionNotFound),
405 13 => Some(ServerError::ConsumerNotFound),
406 14 => Some(ServerError::TooManyRequests),
407 15 => Some(ServerError::TopicTerminatedError),
408 16 => Some(ServerError::ProducerBusy),
409 17 => Some(ServerError::InvalidTopicName),
410 18 => Some(ServerError::IncompatibleSchema),
411 19 => Some(ServerError::ConsumerAssignError),
412 20 => Some(ServerError::TransactionCoordinatorNotFound),
413 21 => Some(ServerError::InvalidTxnStatus),
414 22 => Some(ServerError::NotAllowedError),
415 23 => Some(ServerError::TransactionConflict),
416 24 => Some(ServerError::TransactionNotFound),
417 25 => Some(ServerError::ProducerFenced),
418 _ => None,
419 }
420}