snakeice_rdkafka/
error.rs

1//! Error manipulations.
2
3use std::error::Error;
4use std::ffi::{self, CStr};
5use std::fmt;
6use std::ptr;
7use std::sync::Arc;
8
9use rdkafka_sys as rdsys;
10use rdkafka_sys::types::*;
11
12use crate::util::{KafkaDrop, NativePtr};
13
14// Re-export rdkafka error code
15pub use rdsys::types::RDKafkaErrorCode;
16
17/// Kafka result.
18pub type KafkaResult<T> = Result<T, KafkaError>;
19
20/// Verify if the value represents an error condition.
21///
22/// Some librdkafka codes are informational, rather than true errors.
23pub trait IsError {
24    /// Reports whether the value represents an error.
25    fn is_error(&self) -> bool;
26}
27
28impl IsError for RDKafkaRespErr {
29    fn is_error(&self) -> bool {
30        *self != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR
31    }
32}
33
34impl IsError for RDKafkaConfRes {
35    fn is_error(&self) -> bool {
36        *self != RDKafkaConfRes::RD_KAFKA_CONF_OK
37    }
38}
39
40impl IsError for RDKafkaError {
41    fn is_error(&self) -> bool {
42        self.0.is_some()
43    }
44}
45
46/// Native rdkafka error.
47#[derive(Clone)]
48pub struct RDKafkaError(Option<Arc<NativePtr<rdsys::rd_kafka_error_t>>>);
49
50unsafe impl KafkaDrop for rdsys::rd_kafka_error_t {
51    const TYPE: &'static str = "error";
52    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_error_destroy;
53}
54
55unsafe impl Send for RDKafkaError {}
56unsafe impl Sync for RDKafkaError {}
57
58impl RDKafkaError {
59    pub(crate) unsafe fn from_ptr(ptr: *mut rdsys::rd_kafka_error_t) -> RDKafkaError {
60        unsafe { RDKafkaError(NativePtr::from_ptr(ptr).map(Arc::new)) }
61    }
62
63    fn ptr(&self) -> *const rdsys::rd_kafka_error_t {
64        match &self.0 {
65            None => ptr::null(),
66            Some(p) => p.ptr(),
67        }
68    }
69
70    /// Returns the error code or [`RDKafkaErrorCode::NoError`] if the error is
71    /// null.
72    pub fn code(&self) -> RDKafkaErrorCode {
73        unsafe { rdsys::rd_kafka_error_code(self.ptr()).into() }
74    }
75
76    /// Returns the error code name, e.g., "ERR_UNKNOWN_MEMBER_ID" or an empty
77    /// string if the error is null.
78    pub fn name(&self) -> String {
79        let cstr = unsafe { rdsys::rd_kafka_error_name(self.ptr()) };
80        unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
81    }
82
83    /// Returns a human readable error string or an empty string if the error is
84    /// null.
85    pub fn string(&self) -> String {
86        let cstr = unsafe { rdsys::rd_kafka_error_string(self.ptr()) };
87        unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
88    }
89
90    /// Reports whether the error is a fatal error.
91    ///
92    /// A fatal error indicates that the client instance is no longer usable.
93    pub fn is_fatal(&self) -> bool {
94        unsafe { rdsys::rd_kafka_error_is_fatal(self.ptr()) != 0 }
95    }
96
97    /// Reports whether the operation that encountered the error can be retried.
98    pub fn is_retriable(&self) -> bool {
99        unsafe { rdsys::rd_kafka_error_is_retriable(self.ptr()) != 0 }
100    }
101
102    /// Reports whether the error is an abortable transaction error.
103    pub fn txn_requires_abort(&self) -> bool {
104        unsafe { rdsys::rd_kafka_error_txn_requires_abort(self.ptr()) != 0 }
105    }
106}
107
108impl PartialEq for RDKafkaError {
109    fn eq(&self, other: &RDKafkaError) -> bool {
110        self.code() == other.code()
111    }
112}
113
114impl Eq for RDKafkaError {}
115
116impl fmt::Debug for RDKafkaError {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        write!(f, "RDKafkaError({})", self)
119    }
120}
121
122impl fmt::Display for RDKafkaError {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        f.write_str(&self.string())
125    }
126}
127
128impl Error for RDKafkaError {}
129
130// TODO: consider using macro
131
132/// Represents all possible Kafka errors.
133///
134/// If applicable, check the underlying [`RDKafkaErrorCode`] to get details.
135#[derive(Clone, PartialEq, Eq)]
136#[non_exhaustive]
137pub enum KafkaError {
138    /// Creation of admin operation failed.
139    AdminOpCreation(String),
140    /// The admin operation itself failed.
141    AdminOp(RDKafkaErrorCode),
142    /// The client was dropped before the operation completed.
143    Canceled,
144    /// Invalid client configuration.
145    ClientConfig(RDKafkaConfRes, String, String, String),
146    /// Client creation failed.
147    ClientCreation(String),
148    /// Consumer commit failed.
149    ConsumerCommit(RDKafkaErrorCode),
150    /// Consumer queue close failed.
151    ConsumerQueueClose(RDKafkaErrorCode),
152    /// Flushing failed
153    Flush(RDKafkaErrorCode),
154    /// Global error.
155    Global(RDKafkaErrorCode),
156    /// Group list fetch failed.
157    GroupListFetch(RDKafkaErrorCode),
158    /// Message consumption failed.
159    MessageConsumption(RDKafkaErrorCode),
160    /// Message consumption failed with fatal error.
161    MessageConsumptionFatal(RDKafkaErrorCode),
162    /// Message production error.
163    MessageProduction(RDKafkaErrorCode),
164    /// Metadata fetch error.
165    MetadataFetch(RDKafkaErrorCode),
166    /// No message was received.
167    NoMessageReceived,
168    /// Unexpected null pointer
169    Nul(ffi::NulError),
170    /// Offset fetch failed.
171    OffsetFetch(RDKafkaErrorCode),
172    /// End of partition reached.
173    PartitionEOF(i32),
174    /// Pause/Resume failed.
175    PauseResume(String),
176    /// Rebalance failed.
177    Rebalance(RDKafkaErrorCode),
178    /// Seeking a partition failed.
179    Seek(String),
180    /// Setting partition offset failed.
181    SetPartitionOffset(RDKafkaErrorCode),
182    /// Offset store failed.
183    StoreOffset(RDKafkaErrorCode),
184    /// Subscription creation failed.
185    Subscription(String),
186    /// Transaction error.
187    Transaction(RDKafkaError),
188    /// Mock Cluster error
189    MockCluster(RDKafkaErrorCode),
190}
191
192impl fmt::Debug for KafkaError {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        match self {
195            KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err),
196            KafkaError::AdminOpCreation(err) => {
197                write!(f, "KafkaError (Admin operation creation error: {})", err)
198            }
199            KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
200            KafkaError::ClientConfig(_, desc, key, value) => write!(
201                f,
202                "KafkaError (Client config error: {} {} {})",
203                desc, key, value
204            ),
205            KafkaError::ClientCreation(err) => {
206                write!(f, "KafkaError (Client creation error: {})", err)
207            }
208            KafkaError::ConsumerCommit(err) => {
209                write!(f, "KafkaError (Consumer commit error: {})", err)
210            }
211            KafkaError::ConsumerQueueClose(err) => {
212                write!(f, "KafkaError (Consumer queue close error: {})", err)
213            }
214            KafkaError::Flush(err) => write!(f, "KafkaError (Flush error: {})", err),
215            KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
216            KafkaError::GroupListFetch(err) => {
217                write!(f, "KafkaError (Group list fetch error: {})", err)
218            }
219            KafkaError::MessageConsumption(err) => {
220                write!(f, "KafkaError (Message consumption error: {})", err)
221            }
222            KafkaError::MessageConsumptionFatal(err) => {
223                write!(f, "(Fatal) KafkaError (Message consumption error: {})", err)
224            }
225            KafkaError::MessageProduction(err) => {
226                write!(f, "KafkaError (Message production error: {})", err)
227            }
228            KafkaError::MetadataFetch(err) => {
229                write!(f, "KafkaError (Metadata fetch error: {})", err)
230            }
231            KafkaError::NoMessageReceived => {
232                write!(f, "No message received within the given poll interval")
233            }
234            KafkaError::Nul(_) => write!(f, "FFI null error"),
235            KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
236            KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
237            KafkaError::PauseResume(err) => {
238                write!(f, "KafkaError (Pause/resume error: {})", err)
239            }
240            KafkaError::Rebalance(err) => write!(f, "KafkaError (Rebalance error: {})", err),
241            KafkaError::Seek(err) => write!(f, "KafkaError (Seek error: {})", err),
242            KafkaError::SetPartitionOffset(err) => {
243                write!(f, "KafkaError (Set partition offset error: {})", err)
244            }
245            KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err),
246            KafkaError::Subscription(err) => {
247                write!(f, "KafkaError (Subscription error: {})", err)
248            }
249            KafkaError::Transaction(err) => write!(f, "KafkaError (Transaction error: {})", err),
250            KafkaError::MockCluster(err) => write!(f, "KafkaError (Mock cluster error: {})", err),
251        }
252    }
253}
254
255impl fmt::Display for KafkaError {
256    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257        match self {
258            KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err),
259            KafkaError::AdminOpCreation(err) => {
260                write!(f, "Admin operation creation error: {}", err)
261            }
262            KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
263            KafkaError::ClientConfig(_, desc, key, value) => {
264                write!(f, "Client config error: {} {} {}", desc, key, value)
265            }
266            KafkaError::ClientCreation(err) => write!(f, "Client creation error: {}", err),
267            KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
268            KafkaError::ConsumerQueueClose(err) => write!(f, "Consumer queue close error: {}", err),
269            KafkaError::Flush(err) => write!(f, "Flush error: {}", err),
270            KafkaError::Global(err) => write!(f, "Global error: {}", err),
271            KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
272            KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
273            KafkaError::MessageConsumptionFatal(err) => {
274                write!(f, "(Fatal) Message consumption error: {}", err)
275            }
276            KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
277            KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
278            KafkaError::NoMessageReceived => {
279                write!(f, "No message received within the given poll interval")
280            }
281            KafkaError::Nul(_) => write!(f, "FFI nul error"),
282            KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
283            KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
284            KafkaError::PauseResume(err) => write!(f, "Pause/resume error: {}", err),
285            KafkaError::Rebalance(err) => write!(f, "Rebalance error: {}", err),
286            KafkaError::Seek(err) => write!(f, "Seek error: {}", err),
287            KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
288            KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
289            KafkaError::Subscription(err) => write!(f, "Subscription error: {}", err),
290            KafkaError::Transaction(err) => write!(f, "Transaction error: {}", err),
291            KafkaError::MockCluster(err) => write!(f, "Mock cluster error: {}", err),
292        }
293    }
294}
295
296impl Error for KafkaError {
297    fn source(&self) -> Option<&(dyn Error + 'static)> {
298        match self {
299            KafkaError::AdminOp(_) => None,
300            KafkaError::AdminOpCreation(_) => None,
301            KafkaError::Canceled => None,
302            KafkaError::ClientConfig(..) => None,
303            KafkaError::ClientCreation(_) => None,
304            KafkaError::ConsumerCommit(err) => Some(err),
305            KafkaError::ConsumerQueueClose(err) => Some(err),
306            KafkaError::Flush(err) => Some(err),
307            KafkaError::Global(err) => Some(err),
308            KafkaError::GroupListFetch(err) => Some(err),
309            KafkaError::MessageConsumption(err) => Some(err),
310            KafkaError::MessageConsumptionFatal(err) => Some(err),
311            KafkaError::MessageProduction(err) => Some(err),
312            KafkaError::MetadataFetch(err) => Some(err),
313            KafkaError::NoMessageReceived => None,
314            KafkaError::Nul(_) => None,
315            KafkaError::OffsetFetch(err) => Some(err),
316            KafkaError::PartitionEOF(_) => None,
317            KafkaError::PauseResume(_) => None,
318            KafkaError::Rebalance(err) => Some(err),
319            KafkaError::Seek(_) => None,
320            KafkaError::SetPartitionOffset(err) => Some(err),
321            KafkaError::StoreOffset(err) => Some(err),
322            KafkaError::Subscription(_) => None,
323            KafkaError::Transaction(err) => Some(err),
324            KafkaError::MockCluster(err) => Some(err),
325        }
326    }
327}
328
329impl From<ffi::NulError> for KafkaError {
330    fn from(err: ffi::NulError) -> KafkaError {
331        KafkaError::Nul(err)
332    }
333}
334
335impl KafkaError {
336    /// Returns the [`RDKafkaErrorCode`] underlying this error, if any.
337    #[allow(clippy::match_same_arms)]
338    pub fn rdkafka_error_code(&self) -> Option<RDKafkaErrorCode> {
339        match self {
340            KafkaError::AdminOp(_) => None,
341            KafkaError::AdminOpCreation(_) => None,
342            KafkaError::Canceled => None,
343            KafkaError::ClientConfig(..) => None,
344            KafkaError::ClientCreation(_) => None,
345            KafkaError::ConsumerCommit(err) => Some(*err),
346            KafkaError::ConsumerQueueClose(err) => Some(*err),
347            KafkaError::Flush(err) => Some(*err),
348            KafkaError::Global(err) => Some(*err),
349            KafkaError::GroupListFetch(err) => Some(*err),
350            KafkaError::MessageConsumption(err) => Some(*err),
351            KafkaError::MessageConsumptionFatal(err) => Some(*err),
352            KafkaError::MessageProduction(err) => Some(*err),
353            KafkaError::MetadataFetch(err) => Some(*err),
354            KafkaError::NoMessageReceived => None,
355            KafkaError::Nul(_) => None,
356            KafkaError::OffsetFetch(err) => Some(*err),
357            KafkaError::PartitionEOF(_) => None,
358            KafkaError::PauseResume(_) => None,
359            KafkaError::Rebalance(err) => Some(*err),
360            KafkaError::Seek(_) => None,
361            KafkaError::SetPartitionOffset(err) => Some(*err),
362            KafkaError::StoreOffset(err) => Some(*err),
363            KafkaError::Subscription(_) => None,
364            KafkaError::Transaction(err) => Some(err.code()),
365            KafkaError::MockCluster(err) => Some(*err),
366        }
367    }
368}