1use std::sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc, Mutex,
5};
6use std::{fmt, io};
7
8#[derive(Debug)]
9pub enum Error {
10 Connection(ConnectionError),
11 Consumer(ConsumerError),
12 Producer(ProducerError),
13 ServiceDiscovery(ServiceDiscoveryError),
14 Authentication(AuthenticationError),
15 Custom(String),
16 Executor,
17}
18
19impl From<ConnectionError> for Error {
20 fn from(err: ConnectionError) -> Self {
21 Error::Connection(err)
22 }
23}
24
25impl From<ConsumerError> for Error {
26 fn from(err: ConsumerError) -> Self {
27 Error::Consumer(err)
28 }
29}
30
31impl From<ProducerError> for Error {
32 fn from(err: ProducerError) -> Self {
33 Error::Producer(err)
34 }
35}
36
37impl From<ServiceDiscoveryError> for Error {
38 fn from(err: ServiceDiscoveryError) -> Self {
39 Error::ServiceDiscovery(err)
40 }
41}
42
43impl fmt::Display for Error {
44 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
45 match self {
46 Error::Connection(e) => write!(f, "Connection error: {}", e),
47 Error::Consumer(e) => write!(f, "consumer error: {}", e),
48 Error::Producer(e) => write!(f, "producer error: {}", e),
49 Error::ServiceDiscovery(e) => write!(f, "service discovery error: {}", e),
50 Error::Authentication(e) => write!(f, "authentication error: {}", e),
51 Error::Custom(e) => write!(f, "error: {}", e),
52 Error::Executor => write!(f, "could not spawn task"),
53 }
54 }
55}
56
57impl std::error::Error for Error {
58 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
59 match self {
60 Error::Connection(e) => e.source(),
61 Error::Consumer(e) => e.source(),
62 Error::Producer(e) => e.source(),
63 Error::ServiceDiscovery(e) => e.source(),
64 Error::Authentication(e) => e.source(),
65 Error::Custom(_) => None,
66 Error::Executor => None,
67 }
68 }
69}
70
71#[derive(Debug)]
72pub enum ConnectionError {
73 Io(io::Error),
74 Disconnected,
75 PulsarError(Option<crate::message::proto::ServerError>, Option<String>),
76 Unexpected(String),
77 Decoding(String),
78 Encoding(String),
79 SocketAddr(String),
80 UnexpectedResponse(String),
81 Tls(native_tls::Error),
82 Authentication(AuthenticationError),
83 NotFound,
84 Canceled,
85 Shutdown,
86}
87
88impl From<io::Error> for ConnectionError {
89 fn from(err: io::Error) -> Self {
90 ConnectionError::Io(err)
91 }
92}
93
94impl From<native_tls::Error> for ConnectionError {
95 fn from(err: native_tls::Error) -> Self {
96 ConnectionError::Tls(err)
97 }
98}
99
100impl From<AuthenticationError> for ConnectionError {
101 fn from(err: AuthenticationError) -> Self {
102 ConnectionError::Authentication(err)
103 }
104}
105
106impl fmt::Display for ConnectionError {
107 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108 match self {
109 ConnectionError::Io(e) => write!(f, "{}", e),
110 ConnectionError::Disconnected => write!(f, "Disconnected"),
111 ConnectionError::PulsarError(e, s) => {
112 write!(f, "Server error ({:?}): {}", e, s.as_deref().unwrap_or(""))
113 }
114 ConnectionError::Unexpected(e) => write!(f, "{}", e),
115 ConnectionError::Decoding(e) => write!(f, "Error decoding message: {}", e),
116 ConnectionError::Encoding(e) => write!(f, "Error encoding message: {}", e),
117 ConnectionError::SocketAddr(e) => write!(f, "Error obtaining socket address: {}", e),
118 ConnectionError::Tls(e) => write!(f, "Error connecting TLS stream: {}", e),
119 ConnectionError::Authentication(e) => write!(f, "Error authentication: {}", e),
120 ConnectionError::UnexpectedResponse(e) => {
121 write!(f, "Unexpected response from pulsar: {}", e)
122 }
123 ConnectionError::NotFound => write!(f, "error looking up URL"),
124 ConnectionError::Canceled => write!(f, "canceled request"),
125 ConnectionError::Shutdown => write!(f, "The connection was shut down"),
126 }
127 }
128}
129
130impl std::error::Error for ConnectionError {
131 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
132 match self {
133 ConnectionError::Io(e) => Some(e),
134 _ => None,
135 }
136 }
137}
138
139#[derive(Debug)]
140pub enum ConsumerError {
141 Connection(ConnectionError),
142 MissingPayload(String),
143 Io(io::Error),
144 ChannelFull,
145 Closed,
146 BuildError,
147}
148
149impl From<ConnectionError> for ConsumerError {
150 fn from(err: ConnectionError) -> Self {
151 ConsumerError::Connection(err)
152 }
153}
154
155impl From<io::Error> for ConsumerError {
156 fn from(err: io::Error) -> Self {
157 ConsumerError::Io(err)
158 }
159}
160
161impl From<futures::channel::mpsc::SendError> for ConsumerError {
162 fn from(err: futures::channel::mpsc::SendError) -> Self {
163 if err.is_full() {
164 ConsumerError::ChannelFull
165 } else {
166 ConsumerError::Closed
167 }
168 }
169}
170
171impl fmt::Display for ConsumerError {
172 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
173 match self {
174 ConsumerError::Connection(e) => write!(f, "Connection error: {}", e),
175 ConsumerError::MissingPayload(s) => write!(f, "Missing payload: {}", s),
176 ConsumerError::Io(s) => write!(f, "Decompression error: {}", s),
177 ConsumerError::ChannelFull => write!(
178 f,
179 "cannot send message to the consumer engine: the channel is full"
180 ),
181 ConsumerError::Closed => write!(
182 f,
183 "cannot send message to the consumer engine: the channel is closed"
184 ),
185 ConsumerError::BuildError => write!(f, "Error while building the consumer."),
186 }
187 }
188}
189
190impl std::error::Error for ConsumerError {
191 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
192 match self {
193 ConsumerError::Connection(e) => Some(e),
194 _ => None,
195 }
196 }
197}
198
199pub enum ProducerError {
200 Connection(ConnectionError),
201 Custom(String),
202 Io(io::Error),
203 PartialSend(Vec<Result<SendFuture, Error>>),
204 Batch(Arc<Error>),
206}
207
208impl From<ConnectionError> for ProducerError {
209 fn from(err: ConnectionError) -> Self {
210 ProducerError::Connection(err)
211 }
212}
213
214impl From<io::Error> for ProducerError {
215 fn from(err: io::Error) -> Self {
216 ProducerError::Io(err)
217 }
218}
219
220impl fmt::Display for ProducerError {
221 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
222 match self {
223 ProducerError::Connection(e) => write!(f, "Connection error: {}", e),
224 ProducerError::Io(e) => write!(f, "Compression error: {}", e),
225 ProducerError::Custom(s) => write!(f, "Custom error: {}", s),
226 ProducerError::Batch(e) => write!(f, "Batch error: {}", e),
227 ProducerError::PartialSend(e) => {
228 let (successes, failures) = e.iter().fold((0, 0), |(s, f), r| match r {
229 Ok(_) => (s + 1, f),
230 Err(_) => (s, f + 1),
231 });
232 write!(
233 f,
234 "Partial send error - {} successful, {} failed",
235 successes, failures
236 )?;
237
238 if failures > 0 {
239 let first_error = e
240 .iter()
241 .find(|r| r.is_err())
242 .unwrap()
243 .as_ref()
244 .map(drop)
245 .unwrap_err();
246 write!(f, "first error: {}", first_error)?;
247 }
248 Ok(())
249 }
250 }
251 }
252}
253
254impl fmt::Debug for ProducerError {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 match self {
257 ProducerError::Connection(e) => write!(f, "Connection({:?})", e),
258 ProducerError::Custom(msg) => write!(f, "Custom({:?})", msg),
259 ProducerError::Io(e) => write!(f, "Connection({:?})", e),
260 ProducerError::Batch(e) => write!(f, "Connection({:?})", e),
261 ProducerError::PartialSend(parts) => {
262 write!(f, "PartialSend(")?;
263 for (i, part) in parts.iter().enumerate() {
264 match part {
265 Ok(_) => write!(f, "Ok(SendFuture)")?,
266 Err(e) => write!(f, "Err({:?})", e)?,
267 }
268 if i < (parts.len() - 1) {
269 write!(f, ", ")?;
270 }
271 }
272 write!(f, ")")
273 }
274 }
275 }
276}
277
278impl std::error::Error for ProducerError {
279 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
280 match self {
281 ProducerError::Connection(e) => Some(e),
282 ProducerError::Io(e) => Some(e),
283 ProducerError::Batch(e) => Some(e.as_ref()),
284 ProducerError::PartialSend(parts) => parts
285 .iter()
286 .find(|r| r.is_err())
287 .map(|r| r.as_ref().map(drop).unwrap_err() as _),
288 ProducerError::Custom(_) => None,
289 }
290 }
291}
292
293#[derive(Debug)]
294pub enum ServiceDiscoveryError {
295 Connection(ConnectionError),
296 Query(Option<crate::message::proto::ServerError>, Option<String>),
297 NotFound,
298 DnsLookupError,
299 Canceled,
300 Shutdown,
301 Dummy,
302}
303
304impl From<ConnectionError> for ServiceDiscoveryError {
305 fn from(err: ConnectionError) -> Self {
306 ServiceDiscoveryError::Connection(err)
307 }
308}
309
310impl fmt::Display for ServiceDiscoveryError {
311 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
312 match self {
313 ServiceDiscoveryError::Connection(e) => write!(f, "Connection error: {}", e),
314 ServiceDiscoveryError::Query(e, s) => {
315 write!(f, "Query error ({:?}): {}", e, s.as_deref().unwrap_or(""))
316 }
317 ServiceDiscoveryError::NotFound => write!(f, "cannot find topic"),
318 ServiceDiscoveryError::DnsLookupError => write!(f, "cannot lookup broker address"),
319 ServiceDiscoveryError::Canceled => write!(f, "canceled request"),
320 ServiceDiscoveryError::Shutdown => write!(f, "service discovery engine not responding"),
321 ServiceDiscoveryError::Dummy => write!(f, "placeholder error"),
322 }
323 }
324}
325
326impl std::error::Error for ServiceDiscoveryError {
327 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
328 match self {
329 ServiceDiscoveryError::Connection(e) => Some(e),
330 _ => None,
331 }
332 }
333}
334
335#[derive(Debug)]
336pub enum AuthenticationError {
337 Custom(String)
338}
339
340impl fmt::Display for AuthenticationError {
341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342 match self {
343 AuthenticationError::Custom(m) => write!(f, "authentication error [{}]", m)
344 }
345 }
346}
347
348impl std::error::Error for AuthenticationError {
349
350}
351
352#[derive(Clone)]
353pub(crate) struct SharedError {
354 error_set: Arc<AtomicBool>,
355 error: Arc<Mutex<Option<ConnectionError>>>,
356}
357
358impl SharedError {
359 pub fn new() -> SharedError {
360 SharedError {
361 error_set: Arc::new(AtomicBool::new(false)),
362 error: Arc::new(Mutex::new(None)),
363 }
364 }
365
366 pub fn is_set(&self) -> bool {
367 self.error_set.load(Ordering::Relaxed)
368 }
369
370 pub fn remove(&self) -> Option<ConnectionError> {
371 let mut lock = self.error.lock().unwrap();
372 let error = lock.take();
373 self.error_set.store(false, Ordering::Release);
374 error
375 }
376
377 pub fn set(&self, error: ConnectionError) {
378 let mut lock = self.error.lock().unwrap();
379 *lock = Some(error);
380 self.error_set.store(true, Ordering::Release);
381 }
382}
383
384use crate::message::proto::ServerError;
385use crate::producer::SendFuture;
386
387pub(crate) fn server_error(i: i32) -> Option<ServerError> {
388 match i {
389 0 => Some(ServerError::UnknownError),
390 1 => Some(ServerError::MetadataError),
391 2 => Some(ServerError::PersistenceError),
392 3 => Some(ServerError::AuthenticationError),
393 4 => Some(ServerError::AuthorizationError),
394 5 => Some(ServerError::ConsumerBusy),
395 6 => Some(ServerError::ServiceNotReady),
396 7 => Some(ServerError::ProducerBlockedQuotaExceededError),
397 8 => Some(ServerError::ProducerBlockedQuotaExceededException),
398 9 => Some(ServerError::ChecksumError),
399 10 => Some(ServerError::UnsupportedVersionError),
400 11 => Some(ServerError::TopicNotFound),
401 12 => Some(ServerError::SubscriptionNotFound),
402 13 => Some(ServerError::ConsumerNotFound),
403 14 => Some(ServerError::TooManyRequests),
404 15 => Some(ServerError::TopicTerminatedError),
405 16 => Some(ServerError::ProducerBusy),
406 17 => Some(ServerError::InvalidTopicName),
407 _ => None,
414 }
415}