Skip to main content

rdkafka/consumer/
base_consumer.rs

1//! Low-level consumers.
2
3use std::ffi::{CStr, CString};
4use std::mem::ManuallyDrop;
5use std::os::raw::c_void;
6use std::ptr;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use log::{error, warn};
11use rdkafka_sys as rdsys;
12use rdkafka_sys::types::*;
13
14use crate::client::{Client, EventPollResult, NativeClient, NativeQueue};
15use crate::config::{
16    ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
17};
18use crate::consumer::{
19    CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
20    RebalanceProtocol,
21};
22use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
23use crate::groups::GroupList;
24use crate::log::trace;
25use crate::message::{BorrowedMessage, Message};
26use crate::metadata::Metadata;
27use crate::topic_partition_list::{Offset, TopicPartitionList};
28use crate::util::{cstr_to_owned, NativePtr, Timeout};
29
30/// A low-level consumer that requires manual polling.
31///
32/// This consumer must be periodically polled to make progress on rebalancing,
33/// callbacks and to receive messages.
34pub struct BaseConsumer<C = DefaultConsumerContext>
35where
36    C: ConsumerContext,
37{
38    client: Client<C>,
39    queue: NativeQueue,
40    group_id: Option<String>,
41    nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
42}
43
44impl FromClientConfig for BaseConsumer {
45    fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
46        BaseConsumer::from_config_and_context(config, DefaultConsumerContext)
47    }
48}
49
50/// Creates a new `BaseConsumer` starting from a `ClientConfig`.
51impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
52    fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseConsumer<C>> {
53        BaseConsumer::new(config, config.create_native_config()?, context)
54    }
55}
56
57impl<C> BaseConsumer<C>
58where
59    C: ConsumerContext,
60{
61    pub(crate) fn new(
62        config: &ClientConfig,
63        native_config: NativeClientConfig,
64        context: C,
65    ) -> KafkaResult<BaseConsumer<C>> {
66        unsafe {
67            rdsys::rd_kafka_conf_set_events(
68                native_config.ptr(),
69                rdsys::RD_KAFKA_EVENT_REBALANCE
70                    | rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT
71                    | rdsys::RD_KAFKA_EVENT_STATS
72                    | rdsys::RD_KAFKA_EVENT_ERROR
73                    | rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH,
74            )
75        };
76        let client = Client::new(
77            config,
78            native_config,
79            RDKafkaType::RD_KAFKA_CONSUMER,
80            context,
81        )?;
82
83        let group_id = config.get("group.id").map(|s| s.to_string());
84        // If a group.id is not specified, we won't redirect the main queue to the consumer queue,
85        // allowing continued use of the consumer for fetching metadata and watermarks without the
86        // need to specify a group.id
87        let queue = if group_id.is_some() {
88            // Redirect rdkafka's main queue to the consumer queue so that we only need to listen
89            // to the consumer queue to observe events like rebalancings and stats.
90            unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) };
91            client.consumer_queue().ok_or_else(|| {
92                KafkaError::ClientCreation("rdkafka consumer queue not available".to_string())
93            })?
94        } else {
95            client.main_queue()
96        };
97
98        Ok(BaseConsumer {
99            client,
100            queue,
101            group_id,
102            nonempty_callback: None,
103        })
104    }
105
106    /// Polls the consumer for new messages.
107    ///
108    /// It won't block for more than the specified timeout. Use zero `Duration` for non-blocking
109    /// call. With no timeout it blocks until an event is received.
110    ///
111    /// This method should be called at regular intervals, even if no message is expected,
112    /// to serve any queued events waiting to be handled. This is especially important for
113    /// automatic consumer rebalance, as the rebalance function will be executed by the thread
114    /// calling the poll() function.
115    ///
116    /// # Lifetime
117    ///
118    /// The returned message lives in the memory of the consumer and cannot outlive it.
119    pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
120        self.poll_queue(self.get_queue(), timeout).into()
121    }
122
123    pub(crate) fn poll_queue<T: Into<Timeout>>(
124        &self,
125        queue: &NativeQueue,
126        timeout: T,
127    ) -> EventPollResult<KafkaResult<BorrowedMessage<'_>>> {
128        let now = Instant::now();
129        let initial_timeout = timeout.into();
130        let mut timeout = initial_timeout;
131        let min_poll_interval = self.context().main_queue_min_poll_interval();
132        loop {
133            let op_timeout = std::cmp::min(timeout, min_poll_interval);
134            let maybe_event = self.client().poll_event(queue, op_timeout);
135            match maybe_event {
136                EventPollResult::Event(event) => {
137                    let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
138                    match evtype {
139                        rdsys::RD_KAFKA_EVENT_FETCH => {
140                            if let Some(result) = self.handle_fetch_event(event) {
141                                return EventPollResult::Event(result);
142                            }
143                        }
144                        rdsys::RD_KAFKA_EVENT_ERROR => {
145                            if let Some(err) = self.handle_error_event(event) {
146                                return EventPollResult::Event(Err(err));
147                            }
148                        }
149                        rdsys::RD_KAFKA_EVENT_REBALANCE => {
150                            self.handle_rebalance_event(event);
151                            if timeout != Timeout::Never {
152                                return EventPollResult::EventConsumed;
153                            }
154                        }
155                        rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => {
156                            self.handle_offset_commit_event(event);
157                            if timeout != Timeout::Never {
158                                return EventPollResult::EventConsumed;
159                            }
160                        }
161                        _ => {
162                            let evname = unsafe {
163                                let evname = rdsys::rd_kafka_event_name(event.ptr());
164                                CStr::from_ptr(evname).to_string_lossy()
165                            };
166                            warn!("Ignored event '{evname}' on consumer poll");
167                        }
168                    }
169                }
170                EventPollResult::None => {
171                    timeout = initial_timeout.saturating_sub(now.elapsed());
172                    if timeout.is_zero() {
173                        return EventPollResult::None;
174                    }
175                }
176                EventPollResult::EventConsumed => {
177                    timeout = initial_timeout.saturating_sub(now.elapsed());
178                    if timeout.is_zero() {
179                        return EventPollResult::EventConsumed;
180                    }
181                }
182            };
183        }
184    }
185
186    fn handle_fetch_event(
187        &self,
188        event: NativePtr<RDKafkaEvent>,
189    ) -> Option<KafkaResult<BorrowedMessage<'_>>> {
190        unsafe {
191            NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _)
192                .map(|ptr| BorrowedMessage::from_client(ptr, Arc::new(event), self.client()))
193        }
194    }
195
196    fn handle_rebalance_event(&self, event: NativePtr<RDKafkaEvent>) {
197        let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
198        match err {
199            rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
200            | rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => {
201                let tpl = unsafe {
202                    let native_tpl = rdsys::rd_kafka_event_topic_partition_list(event.ptr());
203                    TopicPartitionList::from_ptr(native_tpl)
204                };
205                // The TPL is owned by the Event and will be destroyed when the event is destroyed.
206                // Dropping it here will lead to double free.
207                let mut tpl = ManuallyDrop::new(tpl);
208                self.context().rebalance(self, err, &mut tpl);
209            }
210            _ => {
211                let err = unsafe {
212                    let err_name =
213                        rdsys::rd_kafka_err2name(rdsys::rd_kafka_event_error(event.ptr()));
214                    CStr::from_ptr(err_name).to_string_lossy()
215                };
216                warn!("invalid rebalance event: {err}");
217            }
218        }
219    }
220
221    fn handle_offset_commit_event(&self, event: NativePtr<RDKafkaEvent>) {
222        let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
223        let commit_error = if err.is_error() {
224            Err(KafkaError::ConsumerCommit(err.into()))
225        } else {
226            Ok(())
227        };
228
229        let offsets = unsafe { rdsys::rd_kafka_event_topic_partition_list(event.ptr()) };
230        if offsets.is_null() {
231            let tpl = TopicPartitionList::new();
232            self.context().commit_callback(commit_error, &tpl);
233        } else {
234            // The TPL is owned by the Event and will be destroyed when the event is destroyed.
235            // Dropping it here will lead to double free.
236            let tpl = ManuallyDrop::new(unsafe { TopicPartitionList::from_ptr(offsets) });
237            self.context().commit_callback(commit_error, &tpl);
238        }
239    }
240
241    fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) -> Option<KafkaError> {
242        let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
243        if rdkafka_err.is_error() {
244            if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
245                let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
246                let partition = unsafe { (*tp_ptr).partition };
247                unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
248                Some(KafkaError::PartitionEOF(partition))
249            } else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
250                Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into()))
251            } else {
252                Some(KafkaError::MessageConsumption(rdkafka_err.into()))
253            }
254        } else {
255            None
256        }
257    }
258
259    /// Returns an iterator over the available messages.
260    ///
261    /// It repeatedly calls [`poll`](#method.poll) with no timeout.
262    ///
263    /// Note that it's also possible to iterate over the consumer directly.
264    ///
265    /// # Examples
266    ///
267    /// All these are equivalent and will receive messages without timing out.
268    ///
269    /// ```rust,no_run
270    /// # let consumer: rdkafka::consumer::BaseConsumer<_> = rdkafka::ClientConfig::new()
271    /// #    .create()
272    /// #    .unwrap();
273    /// #
274    /// loop {
275    ///   let message = consumer.poll(None);
276    ///   // Handle the message
277    /// }
278    /// ```
279    ///
280    /// ```rust,no_run
281    /// # let consumer: rdkafka::consumer::BaseConsumer<_> = rdkafka::ClientConfig::new()
282    /// #    .create()
283    /// #    .unwrap();
284    /// #
285    /// for message in consumer.iter() {
286    ///   // Handle the message
287    /// }
288    /// ```
289    ///
290    /// ```rust,no_run
291    /// # let consumer: rdkafka::consumer::BaseConsumer<_> = rdkafka::ClientConfig::new()
292    /// #    .create()
293    /// #    .unwrap();
294    /// #
295    /// for message in &consumer {
296    ///   // Handle the message
297    /// }
298    /// ```
299    pub fn iter(&self) -> Iter<'_, C> {
300        Iter(self)
301    }
302
303    pub(crate) fn get_queue(&self) -> &NativeQueue {
304        &self.queue
305    }
306
307    /// Splits messages for the specified partition into their own queue.
308    ///
309    /// If the `topic` or `partition` is invalid, returns `None`.
310    ///
311    /// After calling this method, newly-fetched messages for the specified
312    /// partition will be returned via [`PartitionQueue::poll`] rather than
313    /// [`BaseConsumer::poll`]. Note that there may be buffered messages for the
314    /// specified partition that will continue to be returned by
315    /// `BaseConsumer::poll`. For best results, call `split_partition_queue`
316    /// before the first call to `BaseConsumer::poll`.
317    ///
318    /// You must continue to call `BaseConsumer::poll`, even if no messages are
319    /// expected, to serve callbacks.
320    ///
321    /// Note that calling [`Consumer::assign`] will deactivate any existing
322    /// partition queues. You will need to call this method for every partition
323    /// that should be split after every call to `assign`.
324    ///
325    /// Beware that this method is implemented for `&Arc<Self>`, not `&self`.
326    /// You will need to wrap your consumer in an `Arc` in order to call this
327    /// method. This design permits moving the partition queue to another thread
328    /// while ensuring the partition queue does not outlive the consumer.
329    pub fn split_partition_queue(
330        self: &Arc<Self>,
331        topic: &str,
332        partition: i32,
333    ) -> Option<PartitionQueue<C>> {
334        let topic = match CString::new(topic) {
335            Ok(topic) => topic,
336            Err(_) => return None,
337        };
338        let queue = unsafe {
339            NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
340                self.client.native_ptr(),
341                topic.as_ptr(),
342                partition,
343            ))
344        };
345        queue.map(|queue| {
346            unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()) }
347            PartitionQueue::new(self.clone(), queue)
348        })
349    }
350
351    /// Close the queue used by a consumer.
352    /// Only exposed for advanced usage of this API and should not be used under normal circumstances.
353    pub fn close_queue(&self) -> KafkaResult<()> {
354        let err = unsafe {
355            RDKafkaError::from_ptr(rdsys::rd_kafka_consumer_close_queue(
356                self.client.native_ptr(),
357                self.queue.ptr(),
358            ))
359        };
360        if err.is_error() {
361            Err(KafkaError::ConsumerQueueClose(err.code()))
362        } else {
363            Ok(())
364        }
365    }
366
367    /// Returns true if the consumer is closed, else false.
368    pub fn closed(&self) -> bool {
369        unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
370    }
371
372    pub(crate) fn native_client(&self) -> &NativeClient {
373        self.client.native_client()
374    }
375
376    /// Sets a callback that will be invoked whenever the queue becomes
377    /// nonempty.
378    pub fn set_nonempty_callback<F>(&mut self, f: F)
379    where
380        F: Fn() + Send + Sync + 'static,
381    {
382        // SAFETY: we keep `F` alive until the next call to
383        // `rd_kafka_queue_cb_event_enable`. That might be the next call to
384        // `set_nonempty_callback` or it might be when the queue is dropped. The
385        // double indirection is required because `&dyn Fn` is a fat pointer.
386
387        unsafe extern "C" fn native_message_queue_nonempty_cb(
388            _: *mut RDKafka,
389            opaque_ptr: *mut c_void,
390        ) {
391            let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
392            (**f)();
393        }
394
395        let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
396        unsafe {
397            rdsys::rd_kafka_queue_cb_event_enable(
398                self.queue.ptr(),
399                Some(native_message_queue_nonempty_cb),
400                &*f as *const _ as *mut c_void,
401            )
402        }
403        self.nonempty_callback = Some(f);
404    }
405}
406
407impl<C> Consumer<C> for BaseConsumer<C>
408where
409    C: ConsumerContext,
410{
411    fn client(&self) -> &Client<C> {
412        &self.client
413    }
414
415    fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
416        let ptr = unsafe {
417            NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata(
418                self.client.native_ptr(),
419            ))
420        }?;
421        Some(ConsumerGroupMetadata(ptr))
422    }
423
424    fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
425        let mut tpl = TopicPartitionList::new();
426        for topic in topics {
427            tpl.add_topic_unassigned(topic);
428        }
429        let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };
430        if ret_code.is_error() {
431            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
432            return Err(KafkaError::Subscription(error));
433        };
434        Ok(())
435    }
436
437    fn unsubscribe(&self) {
438        unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };
439    }
440
441    fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
442        let ret_code =
443            unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), assignment.ptr()) };
444        if ret_code.is_error() {
445            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
446            return Err(KafkaError::Subscription(error));
447        };
448        Ok(())
449    }
450
451    fn unassign(&self) -> KafkaResult<()> {
452        // Passing null to assign clears the current static assignments list
453        let ret_code = unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), ptr::null()) };
454        if ret_code.is_error() {
455            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
456            return Err(KafkaError::Subscription(error));
457        };
458        Ok(())
459    }
460
461    fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
462        let ret = unsafe {
463            RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_assign(
464                self.client.native_ptr(),
465                assignment.ptr(),
466            ))
467        };
468        if ret.is_error() {
469            let error = ret.name();
470            return Err(KafkaError::Subscription(error));
471        };
472        Ok(())
473    }
474
475    fn incremental_unassign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
476        let ret = unsafe {
477            RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_unassign(
478                self.client.native_ptr(),
479                assignment.ptr(),
480            ))
481        };
482        if ret.is_error() {
483            let error = ret.name();
484            return Err(KafkaError::Subscription(error));
485        };
486        Ok(())
487    }
488
489    fn seek<T: Into<Timeout>>(
490        &self,
491        topic: &str,
492        partition: i32,
493        offset: Offset,
494        timeout: T,
495    ) -> KafkaResult<()> {
496        let topic = self.client.native_topic(topic)?;
497        let ret_code = match offset.to_raw() {
498            Some(offset) => unsafe {
499                rdsys::rd_kafka_seek(topic.ptr(), partition, offset, timeout.into().as_millis())
500            },
501            None => return Err(KafkaError::Seek("Local: Unrepresentable offset".into())),
502        };
503        if ret_code.is_error() {
504            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
505            return Err(KafkaError::Seek(error));
506        };
507        Ok(())
508    }
509
510    fn seek_partitions<T: Into<Timeout>>(
511        &self,
512        topic_partition_list: TopicPartitionList,
513        timeout: T,
514    ) -> KafkaResult<TopicPartitionList> {
515        let ret = unsafe {
516            RDKafkaError::from_ptr(rdsys::rd_kafka_seek_partitions(
517                self.client.native_ptr(),
518                topic_partition_list.ptr(),
519                timeout.into().as_millis(),
520            ))
521        };
522        if ret.is_error() {
523            let error = ret.name();
524            return Err(KafkaError::Seek(error));
525        }
526        Ok(topic_partition_list)
527    }
528
529    fn commit(
530        &self,
531        topic_partition_list: &TopicPartitionList,
532        mode: CommitMode,
533    ) -> KafkaResult<()> {
534        let error = unsafe {
535            rdsys::rd_kafka_commit(
536                self.client.native_ptr(),
537                topic_partition_list.ptr(),
538                mode as i32,
539            )
540        };
541        if error.is_error() {
542            Err(KafkaError::ConsumerCommit(error.into()))
543        } else {
544            Ok(())
545        }
546    }
547
548    fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()> {
549        let error = unsafe {
550            rdsys::rd_kafka_commit(self.client.native_ptr(), ptr::null_mut(), mode as i32)
551        };
552        if error.is_error() {
553            Err(KafkaError::ConsumerCommit(error.into()))
554        } else {
555            Ok(())
556        }
557    }
558
559    fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()> {
560        let error = unsafe {
561            rdsys::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32)
562        };
563        if error.is_error() {
564            Err(KafkaError::ConsumerCommit(error.into()))
565        } else {
566            Ok(())
567        }
568    }
569
570    fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
571        let topic = self.client.native_topic(topic)?;
572        let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) };
573        if error.is_error() {
574            Err(KafkaError::StoreOffset(error.into()))
575        } else {
576            Ok(())
577        }
578    }
579
580    fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
581        let error = unsafe {
582            rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
583        };
584        if error.is_error() {
585            Err(KafkaError::StoreOffset(error.into()))
586        } else {
587            Ok(())
588        }
589    }
590
591    fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
592        let error = unsafe { rdsys::rd_kafka_offsets_store(self.client.native_ptr(), tpl.ptr()) };
593        if error.is_error() {
594            Err(KafkaError::StoreOffset(error.into()))
595        } else {
596            Ok(())
597        }
598    }
599
600    fn subscription(&self) -> KafkaResult<TopicPartitionList> {
601        let mut tpl_ptr = ptr::null_mut();
602        let error = unsafe { rdsys::rd_kafka_subscription(self.client.native_ptr(), &mut tpl_ptr) };
603
604        if error.is_error() {
605            Err(KafkaError::MetadataFetch(error.into()))
606        } else {
607            Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
608        }
609    }
610
611    fn assignment(&self) -> KafkaResult<TopicPartitionList> {
612        let mut tpl_ptr = ptr::null_mut();
613        let error = unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
614
615        if error.is_error() {
616            Err(KafkaError::MetadataFetch(error.into()))
617        } else {
618            Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
619        }
620    }
621
622    fn assignment_lost(&self) -> bool {
623        unsafe { rdsys::rd_kafka_assignment_lost(self.client.native_ptr()) == 1 }
624    }
625
626    fn committed<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<TopicPartitionList> {
627        let mut tpl_ptr = ptr::null_mut();
628        let assignment_error =
629            unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
630        if assignment_error.is_error() {
631            return Err(KafkaError::MetadataFetch(assignment_error.into()));
632        }
633
634        self.committed_offsets(unsafe { TopicPartitionList::from_ptr(tpl_ptr) }, timeout)
635    }
636
637    fn committed_offsets<T: Into<Timeout>>(
638        &self,
639        tpl: TopicPartitionList,
640        timeout: T,
641    ) -> KafkaResult<TopicPartitionList> {
642        let committed_error = unsafe {
643            rdsys::rd_kafka_committed(
644                self.client.native_ptr(),
645                tpl.ptr(),
646                timeout.into().as_millis(),
647            )
648        };
649
650        if committed_error.is_error() {
651            Err(KafkaError::MetadataFetch(committed_error.into()))
652        } else {
653            Ok(tpl)
654        }
655    }
656
657    fn offsets_for_timestamp<T: Into<Timeout>>(
658        &self,
659        timestamp: i64,
660        timeout: T,
661    ) -> KafkaResult<TopicPartitionList> {
662        let mut tpl_ptr = ptr::null_mut();
663        let assignment_error =
664            unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
665        if assignment_error.is_error() {
666            return Err(KafkaError::MetadataFetch(assignment_error.into()));
667        }
668        let mut tpl = unsafe { TopicPartitionList::from_ptr(tpl_ptr) };
669
670        // Set the timestamp we want in the offset field for every partition as
671        // librdkafka expects.
672        tpl.set_all_offsets(Offset::Offset(timestamp))?;
673
674        self.offsets_for_times(tpl, timeout)
675    }
676
677    // `timestamps` is a `TopicPartitionList` with timestamps instead of
678    // offsets.
679    fn offsets_for_times<T: Into<Timeout>>(
680        &self,
681        timestamps: TopicPartitionList,
682        timeout: T,
683    ) -> KafkaResult<TopicPartitionList> {
684        // This call will then put the offset in the offset field of this topic
685        // partition list.
686        let offsets_for_times_error = unsafe {
687            rdsys::rd_kafka_offsets_for_times(
688                self.client.native_ptr(),
689                timestamps.ptr(),
690                timeout.into().as_millis(),
691            )
692        };
693
694        if offsets_for_times_error.is_error() {
695            Err(KafkaError::MetadataFetch(offsets_for_times_error.into()))
696        } else {
697            Ok(timestamps)
698        }
699    }
700
701    fn position(&self) -> KafkaResult<TopicPartitionList> {
702        let tpl = self.assignment()?;
703        let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) };
704        if error.is_error() {
705            Err(KafkaError::MetadataFetch(error.into()))
706        } else {
707            Ok(tpl)
708        }
709    }
710
711    fn fetch_metadata<T: Into<Timeout>>(
712        &self,
713        topic: Option<&str>,
714        timeout: T,
715    ) -> KafkaResult<Metadata> {
716        self.client.fetch_metadata(topic, timeout)
717    }
718
719    fn fetch_watermarks<T: Into<Timeout>>(
720        &self,
721        topic: &str,
722        partition: i32,
723        timeout: T,
724    ) -> KafkaResult<(i64, i64)> {
725        self.client.fetch_watermarks(topic, partition, timeout)
726    }
727
728    fn fetch_group_list<T: Into<Timeout>>(
729        &self,
730        group: Option<&str>,
731        timeout: T,
732    ) -> KafkaResult<GroupList> {
733        self.client.fetch_group_list(group, timeout)
734    }
735
736    fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
737        let ret_code =
738            unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
739        if ret_code.is_error() {
740            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
741            return Err(KafkaError::PauseResume(error));
742        };
743        Ok(())
744    }
745
746    fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
747        let ret_code = unsafe {
748            rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
749        };
750        if ret_code.is_error() {
751            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
752            return Err(KafkaError::PauseResume(error));
753        };
754        Ok(())
755    }
756
757    fn rebalance_protocol(&self) -> RebalanceProtocol {
758        self.client.native_client().rebalance_protocol()
759    }
760}
761
762impl<C> Drop for BaseConsumer<C>
763where
764    C: ConsumerContext,
765{
766    fn drop(&mut self) {
767        unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
768
769        trace!("Destroying consumer: {:?}", self.client.native_ptr());
770        if self.group_id.is_some() {
771            if let Err(err) = self.close_queue() {
772                error!("Failed to close consumer queue on drop: {}", err);
773            } else {
774                while !self.closed() {
775                    self.poll(Duration::from_millis(100));
776                }
777            }
778        }
779        trace!("Consumer destroyed: {:?}", self.client.native_ptr());
780    }
781}
782
783/// A convenience iterator over the messages in a [`BaseConsumer`].
784///
785/// Each call to [`Iter::next`] simply calls [`BaseConsumer::poll`] with an
786/// infinite timeout.
787pub struct Iter<'a, C>(&'a BaseConsumer<C>)
788where
789    C: ConsumerContext;
790
791impl<'a, C> Iterator for Iter<'a, C>
792where
793    C: ConsumerContext,
794{
795    type Item = KafkaResult<BorrowedMessage<'a>>;
796
797    fn next(&mut self) -> Option<Self::Item> {
798        loop {
799            if let Some(item) = self.0.poll(None) {
800                return Some(item);
801            }
802        }
803    }
804}
805
806impl<'a, C> IntoIterator for &'a BaseConsumer<C>
807where
808    C: ConsumerContext,
809{
810    type Item = KafkaResult<BorrowedMessage<'a>>;
811    type IntoIter = Iter<'a, C>;
812
813    fn into_iter(self) -> Self::IntoIter {
814        self.iter()
815    }
816}
817
818/// A message queue for a single partition.
819pub struct PartitionQueue<C>
820where
821    C: ConsumerContext,
822{
823    consumer: Arc<BaseConsumer<C>>,
824    pub(crate) queue: NativeQueue,
825    nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
826}
827
828impl<C> PartitionQueue<C>
829where
830    C: ConsumerContext,
831{
832    pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
833        PartitionQueue {
834            consumer,
835            queue,
836            nonempty_callback: None,
837        }
838    }
839
840    /// Polls the partition for new messages.
841    ///
842    /// The `timeout` parameter controls how long to block if no messages are
843    /// available.
844    ///
845    /// Remember that you must also call [`BaseConsumer::poll`] on the
846    /// associated consumer regularly, even if no messages are expected, to
847    /// serve events.
848    pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
849        self.consumer.poll_queue(&self.queue, timeout).into()
850    }
851
852    /// Sets a callback that will be invoked whenever the queue becomes
853    /// nonempty.
854    pub fn set_nonempty_callback<F>(&mut self, f: F)
855    where
856        F: Fn() + Send + Sync + 'static,
857    {
858        // SAFETY: we keep `F` alive until the next call to
859        // `rd_kafka_queue_cb_event_enable`. That might be the next call to
860        // `set_nonempty_callback` or it might be when the queue is dropped. The
861        // double indirection is required because `&dyn Fn` is a fat pointer.
862
863        unsafe extern "C" fn native_message_queue_nonempty_cb(
864            _: *mut RDKafka,
865            opaque_ptr: *mut c_void,
866        ) {
867            let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
868            (**f)();
869        }
870
871        let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
872        unsafe {
873            rdsys::rd_kafka_queue_cb_event_enable(
874                self.queue.ptr(),
875                Some(native_message_queue_nonempty_cb),
876                &*f as *const _ as *mut c_void,
877            )
878        }
879        self.nonempty_callback = Some(f);
880    }
881}
882
883impl<C> Drop for PartitionQueue<C>
884where
885    C: ConsumerContext,
886{
887    fn drop(&mut self) {
888        unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
889    }
890}