1use std::convert::TryFrom;
15use std::error::Error;
16use std::ffi::{CStr, CString};
17use std::mem::ManuallyDrop;
18use std::os::raw::{c_char, c_void};
19use std::ptr;
20use std::slice;
21use std::string::ToString;
22use std::sync::Arc;
23
24use libc::addrinfo;
25use rdkafka_sys as rdsys;
26use rdkafka_sys::types::*;
27
28use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
29use crate::consumer::RebalanceProtocol;
30use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
31use crate::groups::GroupList;
32use crate::log::{debug, error, info, trace, warn};
33use crate::metadata::Metadata;
34use crate::mocking::MockCluster;
35use crate::statistics::Statistics;
36use crate::util::{self, ErrBuf, KafkaDrop, NativePtr, Timeout};
37
38pub trait ClientContext: Send + Sync + 'static {
52 fn enable_refresh_oauth_token(&self) -> bool {
61 false
62 }
63
64 fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
72 match level {
73 RDKafkaLogLevel::Emerg
74 | RDKafkaLogLevel::Alert
75 | RDKafkaLogLevel::Critical
76 | RDKafkaLogLevel::Error => {
77 error!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
78 }
79 RDKafkaLogLevel::Warning => {
80 warn!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
81 }
82 RDKafkaLogLevel::Notice => {
83 info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
84 }
85 RDKafkaLogLevel::Info => {
86 info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
87 }
88 RDKafkaLogLevel::Debug => {
89 debug!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
90 }
91 }
92 }
93
94 fn stats(&self, statistics: Statistics) {
99 info!("Client stats: {:?}", statistics);
100 }
101
102 fn stats_raw(&self, statistics: &[u8]) {
109 match serde_json::from_slice(statistics) {
110 Ok(stats) => self.stats(stats),
111 Err(e) => error!("Could not parse statistics JSON: {}", e),
112 }
113 }
114
115 fn error(&self, error: KafkaError, reason: &str) {
119 error!("librdkafka: {}: {}", error, reason);
120 }
121
122 fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
132 addr
133 }
134
135 fn generate_oauth_token(
147 &self,
148 _oauthbearer_config: Option<&str>,
149 ) -> Result<OAuthToken, Box<dyn Error>> {
150 Err("Default implementation of generate_oauth_token must be overridden".into())
151 }
152
153 }
158
159#[derive(Clone, Debug, Default)]
164pub struct DefaultClientContext;
165
166impl ClientContext for DefaultClientContext {}
167
168pub struct NativeClient {
176 ptr: NativePtr<RDKafka>,
177}
178
179unsafe impl KafkaDrop for RDKafka {
180 const TYPE: &'static str = "client";
181 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_destroy;
182}
183
184unsafe impl Sync for NativeClient {}
186unsafe impl Send for NativeClient {}
187
188impl NativeClient {
189 pub(crate) unsafe fn from_ptr(ptr: *mut RDKafka) -> NativeClient {
191 NativeClient {
192 ptr: NativePtr::from_ptr(ptr).unwrap(),
193 }
194 }
195
196 pub fn ptr(&self) -> *mut RDKafka {
198 self.ptr.ptr()
199 }
200
201 pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol {
202 let protocol = unsafe { rdsys::rd_kafka_rebalance_protocol(self.ptr()) };
203 if protocol.is_null() {
204 RebalanceProtocol::None
205 } else {
206 let protocol = unsafe { CStr::from_ptr(protocol) };
207 match protocol.to_bytes() {
208 b"NONE" => RebalanceProtocol::None,
209 b"EAGER" => RebalanceProtocol::Eager,
210 b"COOPERATIVE" => RebalanceProtocol::Cooperative,
211 _ => unreachable!(),
212 }
213 }
214 }
215}
216
217pub struct Client<C: ClientContext = DefaultClientContext> {
229 native: Arc<NativeClient>,
230 context: Arc<C>,
231}
232
233impl<C: ClientContext> Client<C> {
234 pub fn new(
236 config: &ClientConfig,
237 native_config: NativeClientConfig,
238 rd_kafka_type: RDKafkaType,
239 context: C,
240 ) -> KafkaResult<Client<C>> {
241 Self::new_context_arc(config, native_config, rd_kafka_type, Arc::new(context))
242 }
243
244 pub(crate) fn new_context_arc(
246 config: &ClientConfig,
247 native_config: NativeClientConfig,
248 rd_kafka_type: RDKafkaType,
249 context: Arc<C>,
250 ) -> KafkaResult<Client<C>> {
251 let mut err_buf = ErrBuf::new();
252 unsafe {
253 rdsys::rd_kafka_conf_set_opaque(
254 native_config.ptr(),
255 Arc::as_ptr(&context) as *mut c_void,
256 )
257 };
258 unsafe {
259 rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>));
260 rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::<C>));
261 rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::<C>));
262 rd_kafka_conf_set_resolve_cb(native_config.ptr(), Some(native_resolve_cb::<C>));
263 }
264 extern "C" {
267 fn rd_kafka_conf_set_resolve_cb(
268 conf: *mut rdsys::rd_kafka_conf_t,
269 resolve_cb: Option<
270 unsafe extern "C" fn(
271 node: *const c_char,
272 service: *const c_char,
273 hints: *const addrinfo,
274 res: *mut *mut addrinfo,
275 opaque: *mut c_void,
276 ) -> std::ffi::c_int,
277 >,
278 );
279 }
280 if context.enable_refresh_oauth_token() {
281 unsafe {
282 rdsys::rd_kafka_conf_enable_sasl_queue(native_config.ptr(), 1);
283 rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb(
284 native_config.ptr(),
285 Some(native_oauth_refresh_cb::<C>),
286 )
287 };
288 }
289
290 let client_ptr = unsafe {
291 let native_config = ManuallyDrop::new(native_config);
292 rdsys::rd_kafka_new(
293 rd_kafka_type,
294 native_config.ptr(),
295 err_buf.as_mut_ptr(),
296 err_buf.capacity(),
297 )
298 };
299 trace!("Create new librdkafka client {:p}", client_ptr);
300
301 if client_ptr.is_null() {
302 return Err(KafkaError::ClientCreation(err_buf.to_string()));
303 }
304
305 if context.enable_refresh_oauth_token() {
306 if let Err(e) = unsafe {
307 match rdsys::rd_kafka_sasl_background_callbacks_enable(client_ptr).as_mut() {
308 None => Ok(()),
309 Some(e) => Err(RDKafkaError::from_ptr(e)),
310 }
311 } {
312 error!("Failed to enable SASL background callbacks: {}", e);
313 unsafe { rdsys::rd_kafka_destroy(client_ptr) };
314 return Err(KafkaError::ClientCreation(e.to_string()));
315 }
316 }
317
318 unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };
319
320 Ok(Client {
321 native: Arc::new(unsafe { NativeClient::from_ptr(client_ptr) }),
322 context,
323 })
324 }
325
326 pub fn native_client(&self) -> &NativeClient {
328 &self.native
329 }
330
331 pub fn native_ptr(&self) -> *mut RDKafka {
333 self.native.ptr.ptr()
334 }
335
336 pub fn context(&self) -> &Arc<C> {
338 &self.context
339 }
340
341 pub async fn fetch_metadata<T: Into<Timeout>>(
344 &self,
345 topic: Option<&str>,
346 timeout: T,
347 ) -> KafkaResult<Metadata> {
348 let client = self.clone();
349 let topic = topic.map(|t| t.to_string());
350 let timeout = timeout.into();
351 tokio::task::spawn_blocking(move || client.fetch_metadata_sync(topic.as_deref(), timeout))
352 .await
353 .unwrap()
354 }
355
356 fn fetch_metadata_sync<T: Into<Timeout>>(
357 &self,
358 topic: Option<&str>,
359 timeout: T,
360 ) -> KafkaResult<Metadata> {
361 let mut metadata_ptr: *const RDKafkaMetadata = ptr::null_mut();
362 let (flag, native_topic) = if let Some(topic_name) = topic {
363 (0, Some(self.native_topic(topic_name)?))
364 } else {
365 (1, None)
366 };
367 trace!("Starting metadata fetch");
368 let ret = unsafe {
369 rdsys::rd_kafka_metadata(
370 self.native_ptr(),
371 flag,
372 native_topic.map(|t| t.ptr()).unwrap_or_else(ptr::null_mut),
373 &mut metadata_ptr as *mut *const RDKafkaMetadata,
374 timeout.into().as_millis(),
375 )
376 };
377 trace!("Metadata fetch completed");
378 if ret.is_error() {
379 return Err(KafkaError::MetadataFetch(ret.into()));
380 }
381
382 Ok(unsafe { Metadata::from_ptr(metadata_ptr) })
383 }
384
385 pub async fn fetch_watermarks<T: Into<Timeout>>(
387 &self,
388 topic: &str,
389 partition: i32,
390 timeout: T,
391 ) -> KafkaResult<(i64, i64)> {
392 let client = self.clone();
393 let topic = topic.to_string();
394 let timeout = timeout.into();
395 tokio::task::spawn_blocking(move || {
396 client.fetch_watermarks_sync(&topic, partition, timeout)
397 })
398 .await
399 .unwrap()
400 }
401
402 fn fetch_watermarks_sync<T: Into<Timeout>>(
403 &self,
404 topic: &str,
405 partition: i32,
406 timeout: T,
407 ) -> KafkaResult<(i64, i64)> {
408 let mut low = -1;
409 let mut high = -1;
410 let topic_c = CString::new(topic.to_string())?;
411 let ret = unsafe {
412 rdsys::rd_kafka_query_watermark_offsets(
413 self.native_ptr(),
414 topic_c.as_ptr(),
415 partition,
416 &mut low as *mut i64,
417 &mut high as *mut i64,
418 timeout.into().as_millis(),
419 )
420 };
421 if ret.is_error() {
422 return Err(KafkaError::MetadataFetch(ret.into()));
423 }
424 Ok((low, high))
425 }
426
427 pub fn fetch_cluster_id<T: Into<Timeout>>(&self, timeout: T) -> Option<String> {
429 let cluster_id =
430 unsafe { rdsys::rd_kafka_clusterid(self.native_ptr(), timeout.into().as_millis()) };
431 if cluster_id.is_null() {
432 return None;
433 }
434 let buf = unsafe { CStr::from_ptr(cluster_id).to_bytes() };
435 String::from_utf8(buf.to_vec()).ok()
436 }
437
438 pub async fn fetch_group_list<T: Into<Timeout>>(
441 &self,
442 group: Option<&str>,
443 timeout: T,
444 ) -> KafkaResult<GroupList> {
445 let client = self.clone();
446 let group = group.map(|g| g.to_string());
447 let timeout = timeout.into();
448 tokio::task::spawn_blocking(move || client.fetch_group_list_sync(group.as_deref(), timeout))
449 .await
450 .unwrap()
451 }
452
453 fn fetch_group_list_sync<T: Into<Timeout>>(
454 &self,
455 group: Option<&str>,
456 timeout: T,
457 ) -> KafkaResult<GroupList> {
458 let group_c = CString::new(group.map_or("".to_string(), ToString::to_string))?;
460 let group_c_ptr = if group.is_some() {
461 group_c.as_ptr()
462 } else {
463 ptr::null_mut()
464 };
465 let mut group_list_ptr: *const RDKafkaGroupList = ptr::null_mut();
466 trace!("Starting group list fetch");
467 let ret = unsafe {
468 rdsys::rd_kafka_list_groups(
469 self.native_ptr(),
470 group_c_ptr,
471 &mut group_list_ptr as *mut *const RDKafkaGroupList,
472 timeout.into().as_millis(),
473 )
474 };
475 trace!("Group list fetch completed");
476 if ret.is_error() {
477 return Err(KafkaError::GroupListFetch(ret.into()));
478 }
479
480 Ok(unsafe { GroupList::from_ptr(group_list_ptr) })
481 }
482
483 pub fn fatal_error(&self) -> Option<(RDKafkaErrorCode, String)> {
489 let mut err_buf = ErrBuf::new();
490 let code = unsafe {
491 rdsys::rd_kafka_fatal_error(self.native_ptr(), err_buf.as_mut_ptr(), err_buf.capacity())
492 };
493 if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
494 None
495 } else {
496 Some((code.into(), err_buf.to_string()))
497 }
498 }
499
500 pub fn mock_cluster(&self) -> Option<MockCluster<'_, C>> {
506 MockCluster::from_client(self)
507 }
508
509 pub(crate) fn native_topic(&self, topic: &str) -> KafkaResult<NativeTopic> {
512 let topic_c = CString::new(topic.to_string())?;
513 Ok(unsafe {
514 NativeTopic::from_ptr(rdsys::rd_kafka_topic_new(
515 self.native_ptr(),
516 topic_c.as_ptr(),
517 ptr::null_mut(),
518 ))
519 .unwrap()
520 })
521 }
522
523 pub(crate) fn new_native_queue(&self) -> NativeQueue {
526 unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_new(self.native_ptr())).unwrap() }
527 }
528
529 pub(crate) fn consumer_queue(&self) -> Option<NativeQueue> {
530 unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_consumer(self.native_ptr())) }
531 }
532
533 pub(crate) fn clone(&self) -> Self {
534 Self {
535 native: self.native.clone(),
536 context: self.context.clone(),
537 }
538 }
539}
540
541pub(crate) type NativeTopic = NativePtr<RDKafkaTopic>;
542
543unsafe impl KafkaDrop for RDKafkaTopic {
544 const TYPE: &'static str = "native topic";
545 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_destroy;
546}
547
548unsafe impl Send for NativeTopic {}
549unsafe impl Sync for NativeTopic {}
550
551pub(crate) type NativeQueue = NativePtr<RDKafkaQueue>;
552
553unsafe impl KafkaDrop for RDKafkaQueue {
554 const TYPE: &'static str = "queue";
555 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_queue_destroy;
556}
557
558unsafe impl Sync for NativeQueue {}
560unsafe impl Send for NativeQueue {}
561
562impl NativeQueue {
563 pub fn poll<T: Into<Timeout>>(&self, t: T) -> *mut RDKafkaEvent {
564 unsafe { rdsys::rd_kafka_queue_poll(self.ptr(), t.into().as_millis()) }
565 }
566}
567
568pub(crate) unsafe extern "C" fn native_log_cb<C: ClientContext>(
569 client: *const RDKafka,
570 level: i32,
571 fac: *const c_char,
572 buf: *const c_char,
573) {
574 let fac = CStr::from_ptr(fac).to_string_lossy();
575 let log_message = CStr::from_ptr(buf).to_string_lossy();
576
577 let context = &mut *(rdsys::rd_kafka_opaque(client) as *mut C);
578 context.log(
579 RDKafkaLogLevel::from_int(level),
580 fac.trim(),
581 log_message.trim(),
582 );
583}
584
585pub(crate) unsafe extern "C" fn native_stats_cb<C: ClientContext>(
586 _conf: *mut RDKafka,
587 json: *mut c_char,
588 json_len: usize,
589 opaque: *mut c_void,
590) -> i32 {
591 let context = &mut *(opaque as *mut C);
592 context.stats_raw(slice::from_raw_parts(json as *mut u8, json_len));
593 0 }
595
596pub(crate) unsafe extern "C" fn native_error_cb<C: ClientContext>(
597 _client: *mut RDKafka,
598 err: i32,
599 reason: *const c_char,
600 opaque: *mut c_void,
601) {
602 let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t");
603 let error = KafkaError::Global(err.into());
604 let reason = CStr::from_ptr(reason).to_string_lossy();
605
606 let context = &mut *(opaque as *mut C);
607 context.error(error, reason.trim());
608}
609
610pub(crate) unsafe extern "C" fn native_resolve_cb<C: ClientContext>(
613 node: *const c_char,
614 service: *const c_char,
615 hints: *const addrinfo,
616 res: *mut *mut addrinfo,
617 opaque: *mut c_void,
618) -> i32 {
619 if node.is_null() || service.is_null() {
621 return unsafe { libc::getaddrinfo(node, service, hints, res) };
622 }
623
624 let host = match CStr::from_ptr(node).to_str() {
626 Ok(host) => host.into(),
627 Err(_) => return libc::EAI_FAIL,
628 };
629 let port = match CStr::from_ptr(service).to_str() {
630 Ok(port) => port.into(),
631 Err(_) => return libc::EAI_FAIL,
632 };
633
634 let context = &mut *(opaque as *mut C);
636 let addr = context.rewrite_broker_addr(BrokerAddr { host, port });
637
638 let node = match CString::new(addr.host) {
640 Ok(node) => node,
641 Err(_) => return libc::EAI_FAIL,
642 };
643 let service = match CString::new(addr.port) {
644 Ok(service) => service,
645 Err(_) => return libc::EAI_FAIL,
646 };
647
648 unsafe { libc::getaddrinfo(node.as_ptr(), service.as_ptr(), hints, res) }
650}
651
652#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
654pub struct BrokerAddr {
655 pub host: String,
657 pub port: String,
660}
661
662pub struct OAuthToken {
670 pub token: String,
672 pub principal_name: String,
674 pub lifetime_ms: i64,
676}
677
678pub(crate) unsafe extern "C" fn native_oauth_refresh_cb<C: ClientContext>(
679 client: *mut RDKafka,
680 oauthbearer_config: *const c_char,
681 opaque: *mut c_void,
682) {
683 let res: Result<_, Box<dyn Error>> = (|| {
684 let context = &mut *(opaque as *mut C);
685 let oauthbearer_config = match oauthbearer_config.is_null() {
686 true => None,
687 false => Some(util::cstr_to_owned(oauthbearer_config)),
688 };
689 let token_info = context.generate_oauth_token(oauthbearer_config.as_deref())?;
690 let token = CString::new(token_info.token)?;
691 let principal_name = CString::new(token_info.principal_name)?;
692 Ok((token, principal_name, token_info.lifetime_ms))
693 })();
694 match res {
695 Ok((token, principal_name, lifetime_ms)) => {
696 let mut err_buf = ErrBuf::new();
697 let code = rdkafka_sys::rd_kafka_oauthbearer_set_token(
698 client,
699 token.as_ptr(),
700 lifetime_ms,
701 principal_name.as_ptr(),
702 ptr::null_mut(),
703 0,
704 err_buf.as_mut_ptr(),
705 err_buf.capacity(),
706 );
707 if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
708 debug!("successfully set refreshed OAuth token");
709 } else {
710 debug!(
711 "failed to set refreshed OAuth token (code {:?}): {}",
712 code, err_buf
713 );
714 rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, err_buf.as_mut_ptr());
715 }
716 }
717 Err(e) => {
718 debug!("failed to refresh OAuth token: {}", e);
719 let message = match CString::new(e.to_string()) {
720 Ok(message) => message,
721 Err(e) => {
722 error!("error message generated while refreshing OAuth token has embedded null character: {}", e);
723 CString::new("error while refreshing OAuth token has embedded null character")
724 .expect("known to be a valid CString")
725 }
726 };
727 rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, message.as_ptr());
728 }
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
738 use crate::config::ClientConfig;
739
740 #[test]
741 fn test_client() {
742 let config = ClientConfig::new();
743 let native_config = config.create_native_config().unwrap();
744 let client = Client::new(
745 &config,
746 native_config,
747 RDKafkaType::RD_KAFKA_PRODUCER,
748 DefaultClientContext,
749 )
750 .unwrap();
751 assert!(!client.native_ptr().is_null());
752 }
753}