madsim_rdkafka/std/
client.rs

1//! Common client functionality.
2//!
3//! In librdkafka parlance, a client is either a consumer or a producer. This
4//! module's [`Client`] type provides the functionality that is common to both
5//! consumers and producers.
6//!
7//! Typically you will not want to construct a client directly. Construct one of
8//! the consumers in the [`consumer`] module or one of the producers in the
9//! [`producer`] modules instead.
10//!
11//! [`consumer`]: crate::consumer
12//! [`producer`]: crate::producer
13
14use std::convert::TryFrom;
15use std::error::Error;
16use std::ffi::{CStr, CString};
17use std::mem::ManuallyDrop;
18use std::os::raw::{c_char, c_void};
19use std::ptr;
20use std::slice;
21use std::string::ToString;
22use std::sync::Arc;
23
24use libc::addrinfo;
25use rdkafka_sys as rdsys;
26use rdkafka_sys::types::*;
27
28use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
29use crate::consumer::RebalanceProtocol;
30use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
31use crate::groups::GroupList;
32use crate::log::{debug, error, info, trace, warn};
33use crate::metadata::Metadata;
34use crate::mocking::MockCluster;
35use crate::statistics::Statistics;
36use crate::util::{self, ErrBuf, KafkaDrop, NativePtr, Timeout};
37
38/// Client-level context.
39///
40/// Each client (consumers and producers included) has a context object that can
41/// be used to customize its behavior. Implementing `ClientContext` enables the
42/// customization of methods common to all clients, while [`ProducerContext`]
43/// and [`ConsumerContext`] are specific to producers and consumers. Refer to
44/// the list of methods to see which callbacks can currently be overridden.
45///
46/// **Important**: implementations of `ClientContext` must be thread safe, as
47/// they might be shared between multiple threads.
48///
49/// [`ConsumerContext`]: crate::consumer::ConsumerContext
50/// [`ProducerContext`]: crate::producer::ProducerContext
51pub trait ClientContext: Send + Sync + 'static {
52    /// Whether to periodically refresh the SASL `OAUTHBEARER` token
53    /// by calling [`ClientContext::generate_oauth_token`].
54    ///
55    /// If disabled, librdkafka's default token refresh callback is used
56    /// instead.
57    ///
58    /// This parameter is only relevant when using the `OAUTHBEARER` SASL
59    /// mechanism.
60    fn enable_refresh_oauth_token(&self) -> bool {
61        false
62    }
63
64    /// Receives log lines from librdkafka.
65    ///
66    /// The default implementation forwards the log lines to the appropriate
67    /// [`log`] crate macro. Consult the [`RDKafkaLogLevel`] documentation for
68    /// details about the log level mapping.
69    ///
70    /// [`log`]: https://docs.rs/log
71    fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
72        match level {
73            RDKafkaLogLevel::Emerg
74            | RDKafkaLogLevel::Alert
75            | RDKafkaLogLevel::Critical
76            | RDKafkaLogLevel::Error => {
77                error!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
78            }
79            RDKafkaLogLevel::Warning => {
80                warn!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
81            }
82            RDKafkaLogLevel::Notice => {
83                info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
84            }
85            RDKafkaLogLevel::Info => {
86                info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
87            }
88            RDKafkaLogLevel::Debug => {
89                debug!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
90            }
91        }
92    }
93
94    /// Receives the decoded statistics of the librdkafka client. To enable, the
95    /// `statistics.interval.ms` configuration parameter must be specified.
96    ///
97    /// The default implementation logs the statistics at the `info` log level.
98    fn stats(&self, statistics: Statistics) {
99        info!("Client stats: {:?}", statistics);
100    }
101
102    /// Receives the JSON-encoded statistics of the librdkafka client. To
103    /// enable, the `statistics.interval.ms` configuration parameter must be
104    /// specified.
105    ///
106    /// The default implementation calls [`ClientContext::stats`] with the
107    /// decoded statistics, logging an error if the decoding fails.
108    fn stats_raw(&self, statistics: &[u8]) {
109        match serde_json::from_slice(statistics) {
110            Ok(stats) => self.stats(stats),
111            Err(e) => error!("Could not parse statistics JSON: {}", e),
112        }
113    }
114
115    /// Receives global errors from the librdkafka client.
116    ///
117    /// The default implementation logs the error at the `error` log level.
118    fn error(&self, error: KafkaError, reason: &str) {
119        error!("librdkafka: {}: {}", error, reason);
120    }
121
122    /// Rewrites a broker address for DNS resolution.
123    ///
124    /// This method is invoked before performing DNS resolution on a broker
125    /// address. The returned address is used in place of the original address.
126    /// It is useful to allow connecting to a Kafka cluster over a tunnel (e.g.,
127    /// SSH or AWS PrivateLink), where the broker addresses returned by the
128    /// bootstrap server need to be rewritten to be routed through the tunnel.
129    ///
130    /// The default implementation returns the address unchanged.
131    fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
132        addr
133    }
134
135    /// Generates an OAuth token from the provided configuration.
136    ///
137    /// Override with an appropriate implementation when using the `OAUTHBEARER`
138    /// SASL authentication mechanism. For this method to be called, you must
139    /// also set [`ClientContext::enable_refresh_oauth_token`] to true.
140    ///
141    /// The `fmt::Display` implementation of the returned error must not
142    /// generate a message with an embedded null character.
143    ///
144    /// The default implementation always returns an error and is meant to
145    /// be overridden.
146    fn generate_oauth_token(
147        &self,
148        _oauthbearer_config: Option<&str>,
149    ) -> Result<OAuthToken, Box<dyn Error>> {
150        Err("Default implementation of generate_oauth_token must be overridden".into())
151    }
152
153    // NOTE: when adding a new method, remember to add it to the
154    // FutureProducerContext as well.
155    // https://github.com/rust-lang/rfcs/pull/1406 will maybe help in the
156    // future.
157}
158
159/// An empty [`ClientContext`] that can be used when no customizations are
160/// needed.
161///
162/// Uses the default callback implementations provided by `ClientContext`.
163#[derive(Clone, Debug, Default)]
164pub struct DefaultClientContext;
165
166impl ClientContext for DefaultClientContext {}
167
168//
169// ********** CLIENT **********
170//
171
172/// A native rdkafka-sys client. This struct shouldn't be used directly. Use
173/// higher level `Client` or producers and consumers.
174// TODO(benesch): this should be `pub(crate)`.
175pub struct NativeClient {
176    ptr: NativePtr<RDKafka>,
177}
178
179unsafe impl KafkaDrop for RDKafka {
180    const TYPE: &'static str = "client";
181    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_destroy;
182}
183
184// The library is completely thread safe, according to the documentation.
185unsafe impl Sync for NativeClient {}
186unsafe impl Send for NativeClient {}
187
188impl NativeClient {
189    /// Wraps a pointer to an RDKafka object and returns a new NativeClient.
190    pub(crate) unsafe fn from_ptr(ptr: *mut RDKafka) -> NativeClient {
191        NativeClient {
192            ptr: NativePtr::from_ptr(ptr).unwrap(),
193        }
194    }
195
196    /// Returns the wrapped pointer to RDKafka.
197    pub fn ptr(&self) -> *mut RDKafka {
198        self.ptr.ptr()
199    }
200
201    pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol {
202        let protocol = unsafe { rdsys::rd_kafka_rebalance_protocol(self.ptr()) };
203        if protocol.is_null() {
204            RebalanceProtocol::None
205        } else {
206            let protocol = unsafe { CStr::from_ptr(protocol) };
207            match protocol.to_bytes() {
208                b"NONE" => RebalanceProtocol::None,
209                b"EAGER" => RebalanceProtocol::Eager,
210                b"COOPERATIVE" => RebalanceProtocol::Cooperative,
211                _ => unreachable!(),
212            }
213        }
214    }
215}
216
217/// A low-level rdkafka client.
218///
219/// This type is the basis of the consumers and producers in the [`consumer`]
220/// and [`producer`] modules, respectively.
221///
222/// Typically you do not want to construct a `Client` directly, but instead
223/// construct a consumer or producer. A `Client` can be used, however, when
224/// only access to cluster metadata and watermarks is required.
225///
226/// [`consumer`]: crate::consumer
227/// [`producer`]: crate::producer
228pub struct Client<C: ClientContext = DefaultClientContext> {
229    native: Arc<NativeClient>,
230    context: Arc<C>,
231}
232
233impl<C: ClientContext> Client<C> {
234    /// Creates a new `Client` given a configuration, a client type and a context.
235    pub fn new(
236        config: &ClientConfig,
237        native_config: NativeClientConfig,
238        rd_kafka_type: RDKafkaType,
239        context: C,
240    ) -> KafkaResult<Client<C>> {
241        Self::new_context_arc(config, native_config, rd_kafka_type, Arc::new(context))
242    }
243
244    /// Creates a new `Client` given a configuration, a client type and a context.
245    pub(crate) fn new_context_arc(
246        config: &ClientConfig,
247        native_config: NativeClientConfig,
248        rd_kafka_type: RDKafkaType,
249        context: Arc<C>,
250    ) -> KafkaResult<Client<C>> {
251        let mut err_buf = ErrBuf::new();
252        unsafe {
253            rdsys::rd_kafka_conf_set_opaque(
254                native_config.ptr(),
255                Arc::as_ptr(&context) as *mut c_void,
256            )
257        };
258        unsafe {
259            rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>));
260            rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::<C>));
261            rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::<C>));
262            rd_kafka_conf_set_resolve_cb(native_config.ptr(), Some(native_resolve_cb::<C>));
263        }
264        // XXX(runji): This function exists in librdkafka, but is blocked by rdkafka-sys.
265        //             We import it here manually.
266        extern "C" {
267            fn rd_kafka_conf_set_resolve_cb(
268                conf: *mut rdsys::rd_kafka_conf_t,
269                resolve_cb: Option<
270                    unsafe extern "C" fn(
271                        node: *const c_char,
272                        service: *const c_char,
273                        hints: *const addrinfo,
274                        res: *mut *mut addrinfo,
275                        opaque: *mut c_void,
276                    ) -> std::ffi::c_int,
277                >,
278            );
279        }
280        if context.enable_refresh_oauth_token() {
281            unsafe {
282                rdsys::rd_kafka_conf_enable_sasl_queue(native_config.ptr(), 1);
283                rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb(
284                    native_config.ptr(),
285                    Some(native_oauth_refresh_cb::<C>),
286                )
287            };
288        }
289
290        let client_ptr = unsafe {
291            let native_config = ManuallyDrop::new(native_config);
292            rdsys::rd_kafka_new(
293                rd_kafka_type,
294                native_config.ptr(),
295                err_buf.as_mut_ptr(),
296                err_buf.capacity(),
297            )
298        };
299        trace!("Create new librdkafka client {:p}", client_ptr);
300
301        if client_ptr.is_null() {
302            return Err(KafkaError::ClientCreation(err_buf.to_string()));
303        }
304
305        if context.enable_refresh_oauth_token() {
306            if let Err(e) = unsafe {
307                match rdsys::rd_kafka_sasl_background_callbacks_enable(client_ptr).as_mut() {
308                    None => Ok(()),
309                    Some(e) => Err(RDKafkaError::from_ptr(e)),
310                }
311            } {
312                error!("Failed to enable SASL background callbacks: {}", e);
313                unsafe { rdsys::rd_kafka_destroy(client_ptr) };
314                return Err(KafkaError::ClientCreation(e.to_string()));
315            }
316        }
317
318        unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };
319
320        Ok(Client {
321            native: Arc::new(unsafe { NativeClient::from_ptr(client_ptr) }),
322            context,
323        })
324    }
325
326    /// Returns a reference to the native rdkafka-sys client.
327    pub fn native_client(&self) -> &NativeClient {
328        &self.native
329    }
330
331    /// Returns a pointer to the native rdkafka-sys client.
332    pub fn native_ptr(&self) -> *mut RDKafka {
333        self.native.ptr.ptr()
334    }
335
336    /// Returns a reference to the context.
337    pub fn context(&self) -> &Arc<C> {
338        &self.context
339    }
340
341    /// Returns the metadata information for the specified topic, or for all topics in the cluster
342    /// if no topic is specified.
343    pub async fn fetch_metadata<T: Into<Timeout>>(
344        &self,
345        topic: Option<&str>,
346        timeout: T,
347    ) -> KafkaResult<Metadata> {
348        let client = self.clone();
349        let topic = topic.map(|t| t.to_string());
350        let timeout = timeout.into();
351        tokio::task::spawn_blocking(move || client.fetch_metadata_sync(topic.as_deref(), timeout))
352            .await
353            .unwrap()
354    }
355
356    fn fetch_metadata_sync<T: Into<Timeout>>(
357        &self,
358        topic: Option<&str>,
359        timeout: T,
360    ) -> KafkaResult<Metadata> {
361        let mut metadata_ptr: *const RDKafkaMetadata = ptr::null_mut();
362        let (flag, native_topic) = if let Some(topic_name) = topic {
363            (0, Some(self.native_topic(topic_name)?))
364        } else {
365            (1, None)
366        };
367        trace!("Starting metadata fetch");
368        let ret = unsafe {
369            rdsys::rd_kafka_metadata(
370                self.native_ptr(),
371                flag,
372                native_topic.map(|t| t.ptr()).unwrap_or_else(ptr::null_mut),
373                &mut metadata_ptr as *mut *const RDKafkaMetadata,
374                timeout.into().as_millis(),
375            )
376        };
377        trace!("Metadata fetch completed");
378        if ret.is_error() {
379            return Err(KafkaError::MetadataFetch(ret.into()));
380        }
381
382        Ok(unsafe { Metadata::from_ptr(metadata_ptr) })
383    }
384
385    /// Returns high and low watermark for the specified topic and partition.
386    pub async fn fetch_watermarks<T: Into<Timeout>>(
387        &self,
388        topic: &str,
389        partition: i32,
390        timeout: T,
391    ) -> KafkaResult<(i64, i64)> {
392        let client = self.clone();
393        let topic = topic.to_string();
394        let timeout = timeout.into();
395        tokio::task::spawn_blocking(move || {
396            client.fetch_watermarks_sync(&topic, partition, timeout)
397        })
398        .await
399        .unwrap()
400    }
401
402    fn fetch_watermarks_sync<T: Into<Timeout>>(
403        &self,
404        topic: &str,
405        partition: i32,
406        timeout: T,
407    ) -> KafkaResult<(i64, i64)> {
408        let mut low = -1;
409        let mut high = -1;
410        let topic_c = CString::new(topic.to_string())?;
411        let ret = unsafe {
412            rdsys::rd_kafka_query_watermark_offsets(
413                self.native_ptr(),
414                topic_c.as_ptr(),
415                partition,
416                &mut low as *mut i64,
417                &mut high as *mut i64,
418                timeout.into().as_millis(),
419            )
420        };
421        if ret.is_error() {
422            return Err(KafkaError::MetadataFetch(ret.into()));
423        }
424        Ok((low, high))
425    }
426
427    /// Returns the cluster identifier option or None if the cluster identifier is null
428    pub fn fetch_cluster_id<T: Into<Timeout>>(&self, timeout: T) -> Option<String> {
429        let cluster_id =
430            unsafe { rdsys::rd_kafka_clusterid(self.native_ptr(), timeout.into().as_millis()) };
431        if cluster_id.is_null() {
432            return None;
433        }
434        let buf = unsafe { CStr::from_ptr(cluster_id).to_bytes() };
435        String::from_utf8(buf.to_vec()).ok()
436    }
437
438    /// Returns the group membership information for the given group. If no group is
439    /// specified, all groups will be returned.
440    pub async fn fetch_group_list<T: Into<Timeout>>(
441        &self,
442        group: Option<&str>,
443        timeout: T,
444    ) -> KafkaResult<GroupList> {
445        let client = self.clone();
446        let group = group.map(|g| g.to_string());
447        let timeout = timeout.into();
448        tokio::task::spawn_blocking(move || client.fetch_group_list_sync(group.as_deref(), timeout))
449            .await
450            .unwrap()
451    }
452
453    fn fetch_group_list_sync<T: Into<Timeout>>(
454        &self,
455        group: Option<&str>,
456        timeout: T,
457    ) -> KafkaResult<GroupList> {
458        // Careful with group_c getting freed before time
459        let group_c = CString::new(group.map_or("".to_string(), ToString::to_string))?;
460        let group_c_ptr = if group.is_some() {
461            group_c.as_ptr()
462        } else {
463            ptr::null_mut()
464        };
465        let mut group_list_ptr: *const RDKafkaGroupList = ptr::null_mut();
466        trace!("Starting group list fetch");
467        let ret = unsafe {
468            rdsys::rd_kafka_list_groups(
469                self.native_ptr(),
470                group_c_ptr,
471                &mut group_list_ptr as *mut *const RDKafkaGroupList,
472                timeout.into().as_millis(),
473            )
474        };
475        trace!("Group list fetch completed");
476        if ret.is_error() {
477            return Err(KafkaError::GroupListFetch(ret.into()));
478        }
479
480        Ok(unsafe { GroupList::from_ptr(group_list_ptr) })
481    }
482
483    /// Returns the first fatal error set on this client instance, or `None` if
484    /// no fatal error has occurred.
485    ///
486    /// This function is intended to be used with idempotent producers, where
487    /// some errors must logically be considered fatal to retain consistency.
488    pub fn fatal_error(&self) -> Option<(RDKafkaErrorCode, String)> {
489        let mut err_buf = ErrBuf::new();
490        let code = unsafe {
491            rdsys::rd_kafka_fatal_error(self.native_ptr(), err_buf.as_mut_ptr(), err_buf.capacity())
492        };
493        if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
494            None
495        } else {
496            Some((code.into(), err_buf.to_string()))
497        }
498    }
499
500    /// If this client was configured with `test.mock.num.brokers`,
501    /// this will return a [`MockCluster`] instance associated with this client,
502    /// otherwise `None` is returned.
503    ///
504    /// [`MockCluster`]: crate::mocking::MockCluster
505    pub fn mock_cluster(&self) -> Option<MockCluster<'_, C>> {
506        MockCluster::from_client(self)
507    }
508
509    /// Returns a NativeTopic from the current client. The NativeTopic shouldn't outlive the client
510    /// it was generated from.
511    pub(crate) fn native_topic(&self, topic: &str) -> KafkaResult<NativeTopic> {
512        let topic_c = CString::new(topic.to_string())?;
513        Ok(unsafe {
514            NativeTopic::from_ptr(rdsys::rd_kafka_topic_new(
515                self.native_ptr(),
516                topic_c.as_ptr(),
517                ptr::null_mut(),
518            ))
519            .unwrap()
520        })
521    }
522
523    /// Returns a NativeQueue from the current client. The NativeQueue shouldn't
524    /// outlive the client it was generated from.
525    pub(crate) fn new_native_queue(&self) -> NativeQueue {
526        unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_new(self.native_ptr())).unwrap() }
527    }
528
529    pub(crate) fn consumer_queue(&self) -> Option<NativeQueue> {
530        unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_consumer(self.native_ptr())) }
531    }
532
533    pub(crate) fn clone(&self) -> Self {
534        Self {
535            native: self.native.clone(),
536            context: self.context.clone(),
537        }
538    }
539}
540
541pub(crate) type NativeTopic = NativePtr<RDKafkaTopic>;
542
543unsafe impl KafkaDrop for RDKafkaTopic {
544    const TYPE: &'static str = "native topic";
545    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_destroy;
546}
547
548unsafe impl Send for NativeTopic {}
549unsafe impl Sync for NativeTopic {}
550
551pub(crate) type NativeQueue = NativePtr<RDKafkaQueue>;
552
553unsafe impl KafkaDrop for RDKafkaQueue {
554    const TYPE: &'static str = "queue";
555    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_queue_destroy;
556}
557
558// The library is completely thread safe, according to the documentation.
559unsafe impl Sync for NativeQueue {}
560unsafe impl Send for NativeQueue {}
561
562impl NativeQueue {
563    pub fn poll<T: Into<Timeout>>(&self, t: T) -> *mut RDKafkaEvent {
564        unsafe { rdsys::rd_kafka_queue_poll(self.ptr(), t.into().as_millis()) }
565    }
566}
567
568pub(crate) unsafe extern "C" fn native_log_cb<C: ClientContext>(
569    client: *const RDKafka,
570    level: i32,
571    fac: *const c_char,
572    buf: *const c_char,
573) {
574    let fac = CStr::from_ptr(fac).to_string_lossy();
575    let log_message = CStr::from_ptr(buf).to_string_lossy();
576
577    let context = &mut *(rdsys::rd_kafka_opaque(client) as *mut C);
578    context.log(
579        RDKafkaLogLevel::from_int(level),
580        fac.trim(),
581        log_message.trim(),
582    );
583}
584
585pub(crate) unsafe extern "C" fn native_stats_cb<C: ClientContext>(
586    _conf: *mut RDKafka,
587    json: *mut c_char,
588    json_len: usize,
589    opaque: *mut c_void,
590) -> i32 {
591    let context = &mut *(opaque as *mut C);
592    context.stats_raw(slice::from_raw_parts(json as *mut u8, json_len));
593    0 // librdkafka will free the json buffer
594}
595
596pub(crate) unsafe extern "C" fn native_error_cb<C: ClientContext>(
597    _client: *mut RDKafka,
598    err: i32,
599    reason: *const c_char,
600    opaque: *mut c_void,
601) {
602    let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t");
603    let error = KafkaError::Global(err.into());
604    let reason = CStr::from_ptr(reason).to_string_lossy();
605
606    let context = &mut *(opaque as *mut C);
607    context.error(error, reason.trim());
608}
609
610// Cherry-picked from Materialize.
611// https://github.com/MaterializeInc/rust-rdkafka/commit/8ea07c4d2b96636ff093e670bc921892aee0d56a
612pub(crate) unsafe extern "C" fn native_resolve_cb<C: ClientContext>(
613    node: *const c_char,
614    service: *const c_char,
615    hints: *const addrinfo,
616    res: *mut *mut addrinfo,
617    opaque: *mut c_void,
618) -> i32 {
619    // XXX(runji): if either node or service is null, call `getaddrinfo` directly
620    if node.is_null() || service.is_null() {
621        return unsafe { libc::getaddrinfo(node, service, hints, res) };
622    }
623
624    // Convert host and port to Rust strings.
625    let host = match CStr::from_ptr(node).to_str() {
626        Ok(host) => host.into(),
627        Err(_) => return libc::EAI_FAIL,
628    };
629    let port = match CStr::from_ptr(service).to_str() {
630        Ok(port) => port.into(),
631        Err(_) => return libc::EAI_FAIL,
632    };
633
634    // Apply the rewrite in the context.
635    let context = &mut *(opaque as *mut C);
636    let addr = context.rewrite_broker_addr(BrokerAddr { host, port });
637
638    // Convert host and port back to C strings.
639    let node = match CString::new(addr.host) {
640        Ok(node) => node,
641        Err(_) => return libc::EAI_FAIL,
642    };
643    let service = match CString::new(addr.port) {
644        Ok(service) => service,
645        Err(_) => return libc::EAI_FAIL,
646    };
647
648    // Perform DNS resolution.
649    unsafe { libc::getaddrinfo(node.as_ptr(), service.as_ptr(), hints, res) }
650}
651
652/// Describes the address of a broker in a Kafka cluster.
653#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
654pub struct BrokerAddr {
655    /// The host name.
656    pub host: String,
657    /// The port, either as a decimal number or the name of a service in
658    /// the services database.
659    pub port: String,
660}
661
662/// A generated OAuth token and its associated metadata.
663///
664/// When using the `OAUTHBEARER` SASL authentication method, this type is
665/// returned from [`ClientContext::generate_oauth_token`]. The token and
666/// principal name must not contain embedded null characters.
667///
668/// Specifying SASL extensions is not currently supported.
669pub struct OAuthToken {
670    /// The token value to set.
671    pub token: String,
672    /// The Kafka principal name associated with the token.
673    pub principal_name: String,
674    /// When the token expires, in number of milliseconds since the Unix epoch.
675    pub lifetime_ms: i64,
676}
677
678pub(crate) unsafe extern "C" fn native_oauth_refresh_cb<C: ClientContext>(
679    client: *mut RDKafka,
680    oauthbearer_config: *const c_char,
681    opaque: *mut c_void,
682) {
683    let res: Result<_, Box<dyn Error>> = (|| {
684        let context = &mut *(opaque as *mut C);
685        let oauthbearer_config = match oauthbearer_config.is_null() {
686            true => None,
687            false => Some(util::cstr_to_owned(oauthbearer_config)),
688        };
689        let token_info = context.generate_oauth_token(oauthbearer_config.as_deref())?;
690        let token = CString::new(token_info.token)?;
691        let principal_name = CString::new(token_info.principal_name)?;
692        Ok((token, principal_name, token_info.lifetime_ms))
693    })();
694    match res {
695        Ok((token, principal_name, lifetime_ms)) => {
696            let mut err_buf = ErrBuf::new();
697            let code = rdkafka_sys::rd_kafka_oauthbearer_set_token(
698                client,
699                token.as_ptr(),
700                lifetime_ms,
701                principal_name.as_ptr(),
702                ptr::null_mut(),
703                0,
704                err_buf.as_mut_ptr(),
705                err_buf.capacity(),
706            );
707            if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
708                debug!("successfully set refreshed OAuth token");
709            } else {
710                debug!(
711                    "failed to set refreshed OAuth token (code {:?}): {}",
712                    code, err_buf
713                );
714                rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, err_buf.as_mut_ptr());
715            }
716        }
717        Err(e) => {
718            debug!("failed to refresh OAuth token: {}", e);
719            let message = match CString::new(e.to_string()) {
720                Ok(message) => message,
721                Err(e) => {
722                    error!("error message generated while refreshing OAuth token has embedded null character: {}", e);
723                    CString::new("error while refreshing OAuth token has embedded null character")
724                        .expect("known to be a valid CString")
725                }
726            };
727            rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, message.as_ptr());
728        }
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    // Just call everything to test there no panics by default, behavior
735    // is tested in the integrations tests.
736
737    use super::*;
738    use crate::config::ClientConfig;
739
740    #[test]
741    fn test_client() {
742        let config = ClientConfig::new();
743        let native_config = config.create_native_config().unwrap();
744        let client = Client::new(
745            &config,
746            native_config,
747            RDKafkaType::RD_KAFKA_PRODUCER,
748            DefaultClientContext,
749        )
750        .unwrap();
751        assert!(!client.native_ptr().is_null());
752    }
753}