1use std::error::Error;
4use std::ffi::{self, CStr};
5use std::fmt;
6use std::ptr;
7use std::sync::Arc;
8
9use rdkafka_sys as rdsys;
10use rdkafka_sys::types::*;
11
12use crate::util::{KafkaDrop, NativePtr};
13
14pub use rdsys::types::RDKafkaErrorCode;
16
17pub type KafkaResult<T> = Result<T, KafkaError>;
19
20pub trait IsError {
24 fn is_error(&self) -> bool;
26}
27
28impl IsError for RDKafkaRespErr {
29 fn is_error(&self) -> bool {
30 *self != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR
31 }
32}
33
34impl IsError for RDKafkaConfRes {
35 fn is_error(&self) -> bool {
36 *self != RDKafkaConfRes::RD_KAFKA_CONF_OK
37 }
38}
39
40impl IsError for RDKafkaError {
41 fn is_error(&self) -> bool {
42 self.0.is_some()
43 }
44}
45
46#[derive(Clone)]
48pub struct RDKafkaError(Option<Arc<NativePtr<rdsys::rd_kafka_error_t>>>);
49
50unsafe impl KafkaDrop for rdsys::rd_kafka_error_t {
51 const TYPE: &'static str = "error";
52 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_error_destroy;
53}
54
55unsafe impl Send for RDKafkaError {}
56unsafe impl Sync for RDKafkaError {}
57
58impl RDKafkaError {
59 pub(crate) unsafe fn from_ptr(ptr: *mut rdsys::rd_kafka_error_t) -> RDKafkaError {
60 unsafe { RDKafkaError(NativePtr::from_ptr(ptr).map(Arc::new)) }
61 }
62
63 fn ptr(&self) -> *const rdsys::rd_kafka_error_t {
64 match &self.0 {
65 None => ptr::null(),
66 Some(p) => p.ptr(),
67 }
68 }
69
70 pub fn code(&self) -> RDKafkaErrorCode {
73 unsafe { rdsys::rd_kafka_error_code(self.ptr()).into() }
74 }
75
76 pub fn name(&self) -> String {
79 let cstr = unsafe { rdsys::rd_kafka_error_name(self.ptr()) };
80 unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
81 }
82
83 pub fn string(&self) -> String {
86 let cstr = unsafe { rdsys::rd_kafka_error_string(self.ptr()) };
87 unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
88 }
89
90 pub fn is_fatal(&self) -> bool {
94 unsafe { rdsys::rd_kafka_error_is_fatal(self.ptr()) != 0 }
95 }
96
97 pub fn is_retriable(&self) -> bool {
99 unsafe { rdsys::rd_kafka_error_is_retriable(self.ptr()) != 0 }
100 }
101
102 pub fn txn_requires_abort(&self) -> bool {
104 unsafe { rdsys::rd_kafka_error_txn_requires_abort(self.ptr()) != 0 }
105 }
106}
107
108impl PartialEq for RDKafkaError {
109 fn eq(&self, other: &RDKafkaError) -> bool {
110 self.code() == other.code()
111 }
112}
113
114impl Eq for RDKafkaError {}
115
116impl fmt::Debug for RDKafkaError {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 write!(f, "RDKafkaError({})", self)
119 }
120}
121
122impl fmt::Display for RDKafkaError {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.write_str(&self.string())
125 }
126}
127
128impl Error for RDKafkaError {}
129
130#[derive(Clone, PartialEq, Eq)]
136#[non_exhaustive]
137pub enum KafkaError {
138 AdminOpCreation(String),
140 AdminOp(RDKafkaErrorCode),
142 Canceled,
144 ClientConfig(RDKafkaConfRes, String, String, String),
146 ClientCreation(String),
148 ConsumerCommit(RDKafkaErrorCode),
150 ConsumerQueueClose(RDKafkaErrorCode),
152 Flush(RDKafkaErrorCode),
154 Global(RDKafkaErrorCode),
156 GroupListFetch(RDKafkaErrorCode),
158 MessageConsumption(RDKafkaErrorCode),
160 MessageConsumptionFatal(RDKafkaErrorCode),
162 MessageProduction(RDKafkaErrorCode),
164 MetadataFetch(RDKafkaErrorCode),
166 NoMessageReceived,
168 Nul(ffi::NulError),
170 OffsetFetch(RDKafkaErrorCode),
172 PartitionEOF(i32),
174 PauseResume(String),
176 Rebalance(RDKafkaErrorCode),
178 Seek(String),
180 SetPartitionOffset(RDKafkaErrorCode),
182 StoreOffset(RDKafkaErrorCode),
184 Subscription(String),
186 Transaction(RDKafkaError),
188 MockCluster(RDKafkaErrorCode),
190}
191
192impl fmt::Debug for KafkaError {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 match self {
195 KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err),
196 KafkaError::AdminOpCreation(err) => {
197 write!(f, "KafkaError (Admin operation creation error: {})", err)
198 }
199 KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
200 KafkaError::ClientConfig(_, desc, key, value) => write!(
201 f,
202 "KafkaError (Client config error: {} {} {})",
203 desc, key, value
204 ),
205 KafkaError::ClientCreation(err) => {
206 write!(f, "KafkaError (Client creation error: {})", err)
207 }
208 KafkaError::ConsumerCommit(err) => {
209 write!(f, "KafkaError (Consumer commit error: {})", err)
210 }
211 KafkaError::ConsumerQueueClose(err) => {
212 write!(f, "KafkaError (Consumer queue close error: {})", err)
213 }
214 KafkaError::Flush(err) => write!(f, "KafkaError (Flush error: {})", err),
215 KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
216 KafkaError::GroupListFetch(err) => {
217 write!(f, "KafkaError (Group list fetch error: {})", err)
218 }
219 KafkaError::MessageConsumption(err) => {
220 write!(f, "KafkaError (Message consumption error: {})", err)
221 }
222 KafkaError::MessageConsumptionFatal(err) => {
223 write!(f, "(Fatal) KafkaError (Message consumption error: {})", err)
224 }
225 KafkaError::MessageProduction(err) => {
226 write!(f, "KafkaError (Message production error: {})", err)
227 }
228 KafkaError::MetadataFetch(err) => {
229 write!(f, "KafkaError (Metadata fetch error: {})", err)
230 }
231 KafkaError::NoMessageReceived => {
232 write!(f, "No message received within the given poll interval")
233 }
234 KafkaError::Nul(_) => write!(f, "FFI null error"),
235 KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
236 KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
237 KafkaError::PauseResume(err) => {
238 write!(f, "KafkaError (Pause/resume error: {})", err)
239 }
240 KafkaError::Rebalance(err) => write!(f, "KafkaError (Rebalance error: {})", err),
241 KafkaError::Seek(err) => write!(f, "KafkaError (Seek error: {})", err),
242 KafkaError::SetPartitionOffset(err) => {
243 write!(f, "KafkaError (Set partition offset error: {})", err)
244 }
245 KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err),
246 KafkaError::Subscription(err) => {
247 write!(f, "KafkaError (Subscription error: {})", err)
248 }
249 KafkaError::Transaction(err) => write!(f, "KafkaError (Transaction error: {})", err),
250 KafkaError::MockCluster(err) => write!(f, "KafkaError (Mock cluster error: {})", err),
251 }
252 }
253}
254
255impl fmt::Display for KafkaError {
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 match self {
258 KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err),
259 KafkaError::AdminOpCreation(err) => {
260 write!(f, "Admin operation creation error: {}", err)
261 }
262 KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
263 KafkaError::ClientConfig(_, desc, key, value) => {
264 write!(f, "Client config error: {} {} {}", desc, key, value)
265 }
266 KafkaError::ClientCreation(err) => write!(f, "Client creation error: {}", err),
267 KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
268 KafkaError::ConsumerQueueClose(err) => write!(f, "Consumer queue close error: {}", err),
269 KafkaError::Flush(err) => write!(f, "Flush error: {}", err),
270 KafkaError::Global(err) => write!(f, "Global error: {}", err),
271 KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
272 KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
273 KafkaError::MessageConsumptionFatal(err) => {
274 write!(f, "(Fatal) Message consumption error: {}", err)
275 }
276 KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
277 KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
278 KafkaError::NoMessageReceived => {
279 write!(f, "No message received within the given poll interval")
280 }
281 KafkaError::Nul(_) => write!(f, "FFI nul error"),
282 KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
283 KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
284 KafkaError::PauseResume(err) => write!(f, "Pause/resume error: {}", err),
285 KafkaError::Rebalance(err) => write!(f, "Rebalance error: {}", err),
286 KafkaError::Seek(err) => write!(f, "Seek error: {}", err),
287 KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
288 KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
289 KafkaError::Subscription(err) => write!(f, "Subscription error: {}", err),
290 KafkaError::Transaction(err) => write!(f, "Transaction error: {}", err),
291 KafkaError::MockCluster(err) => write!(f, "Mock cluster error: {}", err),
292 }
293 }
294}
295
296impl Error for KafkaError {
297 fn source(&self) -> Option<&(dyn Error + 'static)> {
298 match self {
299 KafkaError::AdminOp(_) => None,
300 KafkaError::AdminOpCreation(_) => None,
301 KafkaError::Canceled => None,
302 KafkaError::ClientConfig(..) => None,
303 KafkaError::ClientCreation(_) => None,
304 KafkaError::ConsumerCommit(err) => Some(err),
305 KafkaError::ConsumerQueueClose(err) => Some(err),
306 KafkaError::Flush(err) => Some(err),
307 KafkaError::Global(err) => Some(err),
308 KafkaError::GroupListFetch(err) => Some(err),
309 KafkaError::MessageConsumption(err) => Some(err),
310 KafkaError::MessageConsumptionFatal(err) => Some(err),
311 KafkaError::MessageProduction(err) => Some(err),
312 KafkaError::MetadataFetch(err) => Some(err),
313 KafkaError::NoMessageReceived => None,
314 KafkaError::Nul(_) => None,
315 KafkaError::OffsetFetch(err) => Some(err),
316 KafkaError::PartitionEOF(_) => None,
317 KafkaError::PauseResume(_) => None,
318 KafkaError::Rebalance(err) => Some(err),
319 KafkaError::Seek(_) => None,
320 KafkaError::SetPartitionOffset(err) => Some(err),
321 KafkaError::StoreOffset(err) => Some(err),
322 KafkaError::Subscription(_) => None,
323 KafkaError::Transaction(err) => Some(err),
324 KafkaError::MockCluster(err) => Some(err),
325 }
326 }
327}
328
329impl From<ffi::NulError> for KafkaError {
330 fn from(err: ffi::NulError) -> KafkaError {
331 KafkaError::Nul(err)
332 }
333}
334
335impl KafkaError {
336 #[allow(clippy::match_same_arms)]
338 pub fn rdkafka_error_code(&self) -> Option<RDKafkaErrorCode> {
339 match self {
340 KafkaError::AdminOp(_) => None,
341 KafkaError::AdminOpCreation(_) => None,
342 KafkaError::Canceled => None,
343 KafkaError::ClientConfig(..) => None,
344 KafkaError::ClientCreation(_) => None,
345 KafkaError::ConsumerCommit(err) => Some(*err),
346 KafkaError::ConsumerQueueClose(err) => Some(*err),
347 KafkaError::Flush(err) => Some(*err),
348 KafkaError::Global(err) => Some(*err),
349 KafkaError::GroupListFetch(err) => Some(*err),
350 KafkaError::MessageConsumption(err) => Some(*err),
351 KafkaError::MessageConsumptionFatal(err) => Some(*err),
352 KafkaError::MessageProduction(err) => Some(*err),
353 KafkaError::MetadataFetch(err) => Some(*err),
354 KafkaError::NoMessageReceived => None,
355 KafkaError::Nul(_) => None,
356 KafkaError::OffsetFetch(err) => Some(*err),
357 KafkaError::PartitionEOF(_) => None,
358 KafkaError::PauseResume(_) => None,
359 KafkaError::Rebalance(err) => Some(*err),
360 KafkaError::Seek(_) => None,
361 KafkaError::SetPartitionOffset(err) => Some(*err),
362 KafkaError::StoreOffset(err) => Some(*err),
363 KafkaError::Subscription(_) => None,
364 KafkaError::Transaction(err) => Some(err.code()),
365 KafkaError::MockCluster(err) => Some(*err),
366 }
367 }
368}