1use std::ffi::{CStr, CString};
4use std::mem::ManuallyDrop;
5use std::os::raw::c_void;
6use std::ptr;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use log::{error, warn};
11use rdkafka_sys as rdsys;
12use rdkafka_sys::types::*;
13
14use crate::client::{Client, EventPollResult, NativeClient, NativeQueue};
15use crate::config::{
16 ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
17};
18use crate::consumer::{
19 CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
20 RebalanceProtocol,
21};
22use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
23use crate::groups::GroupList;
24use crate::log::trace;
25use crate::message::{BorrowedMessage, Message};
26use crate::metadata::Metadata;
27use crate::topic_partition_list::{Offset, TopicPartitionList};
28use crate::util::{cstr_to_owned, NativePtr, Timeout};
29
30pub struct BaseConsumer<C = DefaultConsumerContext>
35where
36 C: ConsumerContext,
37{
38 client: Client<C>,
39 queue: NativeQueue,
40 group_id: Option<String>,
41 nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
42}
43
44impl FromClientConfig for BaseConsumer {
45 fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
46 BaseConsumer::from_config_and_context(config, DefaultConsumerContext)
47 }
48}
49
50impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
52 fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseConsumer<C>> {
53 BaseConsumer::new(config, config.create_native_config()?, context)
54 }
55}
56
57impl<C> BaseConsumer<C>
58where
59 C: ConsumerContext,
60{
61 pub(crate) fn new(
62 config: &ClientConfig,
63 native_config: NativeClientConfig,
64 context: C,
65 ) -> KafkaResult<BaseConsumer<C>> {
66 unsafe {
67 rdsys::rd_kafka_conf_set_events(
68 native_config.ptr(),
69 rdsys::RD_KAFKA_EVENT_REBALANCE
70 | rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT
71 | rdsys::RD_KAFKA_EVENT_STATS
72 | rdsys::RD_KAFKA_EVENT_ERROR
73 | rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH,
74 )
75 };
76 let client = Client::new(
77 config,
78 native_config,
79 RDKafkaType::RD_KAFKA_CONSUMER,
80 context,
81 )?;
82
83 let group_id = config.get("group.id").map(|s| s.to_string());
84 let queue = if group_id.is_some() {
88 unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) };
91 client.consumer_queue().ok_or_else(|| {
92 KafkaError::ClientCreation("rdkafka consumer queue not available".to_string())
93 })?
94 } else {
95 client.main_queue()
96 };
97
98 Ok(BaseConsumer {
99 client,
100 queue,
101 group_id,
102 nonempty_callback: None,
103 })
104 }
105
106 pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
120 self.poll_queue(self.get_queue(), timeout).into()
121 }
122
123 pub(crate) fn poll_queue<T: Into<Timeout>>(
124 &self,
125 queue: &NativeQueue,
126 timeout: T,
127 ) -> EventPollResult<KafkaResult<BorrowedMessage<'_>>> {
128 let now = Instant::now();
129 let initial_timeout = timeout.into();
130 let mut timeout = initial_timeout;
131 let min_poll_interval = self.context().main_queue_min_poll_interval();
132 loop {
133 let op_timeout = std::cmp::min(timeout, min_poll_interval);
134 let maybe_event = self.client().poll_event(queue, op_timeout);
135 match maybe_event {
136 EventPollResult::Event(event) => {
137 let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
138 match evtype {
139 rdsys::RD_KAFKA_EVENT_FETCH => {
140 if let Some(result) = self.handle_fetch_event(event) {
141 return EventPollResult::Event(result);
142 }
143 }
144 rdsys::RD_KAFKA_EVENT_ERROR => {
145 if let Some(err) = self.handle_error_event(event) {
146 return EventPollResult::Event(Err(err));
147 }
148 }
149 rdsys::RD_KAFKA_EVENT_REBALANCE => {
150 self.handle_rebalance_event(event);
151 if timeout != Timeout::Never {
152 return EventPollResult::EventConsumed;
153 }
154 }
155 rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => {
156 self.handle_offset_commit_event(event);
157 if timeout != Timeout::Never {
158 return EventPollResult::EventConsumed;
159 }
160 }
161 _ => {
162 let evname = unsafe {
163 let evname = rdsys::rd_kafka_event_name(event.ptr());
164 CStr::from_ptr(evname).to_string_lossy()
165 };
166 warn!("Ignored event '{evname}' on consumer poll");
167 }
168 }
169 }
170 EventPollResult::None => {
171 timeout = initial_timeout.saturating_sub(now.elapsed());
172 if timeout.is_zero() {
173 return EventPollResult::None;
174 }
175 }
176 EventPollResult::EventConsumed => {
177 timeout = initial_timeout.saturating_sub(now.elapsed());
178 if timeout.is_zero() {
179 return EventPollResult::EventConsumed;
180 }
181 }
182 };
183 }
184 }
185
186 fn handle_fetch_event(
187 &self,
188 event: NativePtr<RDKafkaEvent>,
189 ) -> Option<KafkaResult<BorrowedMessage<'_>>> {
190 unsafe {
191 NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _)
192 .map(|ptr| BorrowedMessage::from_client(ptr, Arc::new(event), self.client()))
193 }
194 }
195
196 fn handle_rebalance_event(&self, event: NativePtr<RDKafkaEvent>) {
197 let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
198 match err {
199 rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
200 | rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => {
201 let tpl = unsafe {
202 let native_tpl = rdsys::rd_kafka_event_topic_partition_list(event.ptr());
203 TopicPartitionList::from_ptr(native_tpl)
204 };
205 let mut tpl = ManuallyDrop::new(tpl);
208 self.context().rebalance(self, err, &mut tpl);
209 }
210 _ => {
211 let err = unsafe {
212 let err_name =
213 rdsys::rd_kafka_err2name(rdsys::rd_kafka_event_error(event.ptr()));
214 CStr::from_ptr(err_name).to_string_lossy()
215 };
216 warn!("invalid rebalance event: {err}");
217 }
218 }
219 }
220
221 fn handle_offset_commit_event(&self, event: NativePtr<RDKafkaEvent>) {
222 let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
223 let commit_error = if err.is_error() {
224 Err(KafkaError::ConsumerCommit(err.into()))
225 } else {
226 Ok(())
227 };
228
229 let offsets = unsafe { rdsys::rd_kafka_event_topic_partition_list(event.ptr()) };
230 if offsets.is_null() {
231 let tpl = TopicPartitionList::new();
232 self.context().commit_callback(commit_error, &tpl);
233 } else {
234 let tpl = ManuallyDrop::new(unsafe { TopicPartitionList::from_ptr(offsets) });
237 self.context().commit_callback(commit_error, &tpl);
238 }
239 }
240
241 fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) -> Option<KafkaError> {
242 let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
243 if rdkafka_err.is_error() {
244 if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
245 let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
246 let partition = unsafe { (*tp_ptr).partition };
247 unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
248 Some(KafkaError::PartitionEOF(partition))
249 } else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
250 Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into()))
251 } else {
252 Some(KafkaError::MessageConsumption(rdkafka_err.into()))
253 }
254 } else {
255 None
256 }
257 }
258
259 pub fn iter(&self) -> Iter<'_, C> {
300 Iter(self)
301 }
302
303 pub(crate) fn get_queue(&self) -> &NativeQueue {
304 &self.queue
305 }
306
307 pub fn split_partition_queue(
330 self: &Arc<Self>,
331 topic: &str,
332 partition: i32,
333 ) -> Option<PartitionQueue<C>> {
334 let topic = match CString::new(topic) {
335 Ok(topic) => topic,
336 Err(_) => return None,
337 };
338 let queue = unsafe {
339 NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
340 self.client.native_ptr(),
341 topic.as_ptr(),
342 partition,
343 ))
344 };
345 queue.map(|queue| {
346 unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()) }
347 PartitionQueue::new(self.clone(), queue)
348 })
349 }
350
351 pub fn close_queue(&self) -> KafkaResult<()> {
354 let err = unsafe {
355 RDKafkaError::from_ptr(rdsys::rd_kafka_consumer_close_queue(
356 self.client.native_ptr(),
357 self.queue.ptr(),
358 ))
359 };
360 if err.is_error() {
361 Err(KafkaError::ConsumerQueueClose(err.code()))
362 } else {
363 Ok(())
364 }
365 }
366
367 pub fn closed(&self) -> bool {
369 unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
370 }
371
372 pub(crate) fn native_client(&self) -> &NativeClient {
373 self.client.native_client()
374 }
375
376 pub fn set_nonempty_callback<F>(&mut self, f: F)
379 where
380 F: Fn() + Send + Sync + 'static,
381 {
382 unsafe extern "C" fn native_message_queue_nonempty_cb(
388 _: *mut RDKafka,
389 opaque_ptr: *mut c_void,
390 ) {
391 let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
392 (**f)();
393 }
394
395 let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
396 unsafe {
397 rdsys::rd_kafka_queue_cb_event_enable(
398 self.queue.ptr(),
399 Some(native_message_queue_nonempty_cb),
400 &*f as *const _ as *mut c_void,
401 )
402 }
403 self.nonempty_callback = Some(f);
404 }
405}
406
407impl<C> Consumer<C> for BaseConsumer<C>
408where
409 C: ConsumerContext,
410{
411 fn client(&self) -> &Client<C> {
412 &self.client
413 }
414
415 fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
416 let ptr = unsafe {
417 NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata(
418 self.client.native_ptr(),
419 ))
420 }?;
421 Some(ConsumerGroupMetadata(ptr))
422 }
423
424 fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
425 let mut tpl = TopicPartitionList::new();
426 for topic in topics {
427 tpl.add_topic_unassigned(topic);
428 }
429 let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };
430 if ret_code.is_error() {
431 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
432 return Err(KafkaError::Subscription(error));
433 };
434 Ok(())
435 }
436
437 fn unsubscribe(&self) {
438 unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };
439 }
440
441 fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
442 let ret_code =
443 unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), assignment.ptr()) };
444 if ret_code.is_error() {
445 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
446 return Err(KafkaError::Subscription(error));
447 };
448 Ok(())
449 }
450
451 fn unassign(&self) -> KafkaResult<()> {
452 let ret_code = unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), ptr::null()) };
454 if ret_code.is_error() {
455 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
456 return Err(KafkaError::Subscription(error));
457 };
458 Ok(())
459 }
460
461 fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
462 let ret = unsafe {
463 RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_assign(
464 self.client.native_ptr(),
465 assignment.ptr(),
466 ))
467 };
468 if ret.is_error() {
469 let error = ret.name();
470 return Err(KafkaError::Subscription(error));
471 };
472 Ok(())
473 }
474
475 fn incremental_unassign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
476 let ret = unsafe {
477 RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_unassign(
478 self.client.native_ptr(),
479 assignment.ptr(),
480 ))
481 };
482 if ret.is_error() {
483 let error = ret.name();
484 return Err(KafkaError::Subscription(error));
485 };
486 Ok(())
487 }
488
489 fn seek<T: Into<Timeout>>(
490 &self,
491 topic: &str,
492 partition: i32,
493 offset: Offset,
494 timeout: T,
495 ) -> KafkaResult<()> {
496 let topic = self.client.native_topic(topic)?;
497 let ret_code = match offset.to_raw() {
498 Some(offset) => unsafe {
499 rdsys::rd_kafka_seek(topic.ptr(), partition, offset, timeout.into().as_millis())
500 },
501 None => return Err(KafkaError::Seek("Local: Unrepresentable offset".into())),
502 };
503 if ret_code.is_error() {
504 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
505 return Err(KafkaError::Seek(error));
506 };
507 Ok(())
508 }
509
510 fn seek_partitions<T: Into<Timeout>>(
511 &self,
512 topic_partition_list: TopicPartitionList,
513 timeout: T,
514 ) -> KafkaResult<TopicPartitionList> {
515 let ret = unsafe {
516 RDKafkaError::from_ptr(rdsys::rd_kafka_seek_partitions(
517 self.client.native_ptr(),
518 topic_partition_list.ptr(),
519 timeout.into().as_millis(),
520 ))
521 };
522 if ret.is_error() {
523 let error = ret.name();
524 return Err(KafkaError::Seek(error));
525 }
526 Ok(topic_partition_list)
527 }
528
529 fn commit(
530 &self,
531 topic_partition_list: &TopicPartitionList,
532 mode: CommitMode,
533 ) -> KafkaResult<()> {
534 let error = unsafe {
535 rdsys::rd_kafka_commit(
536 self.client.native_ptr(),
537 topic_partition_list.ptr(),
538 mode as i32,
539 )
540 };
541 if error.is_error() {
542 Err(KafkaError::ConsumerCommit(error.into()))
543 } else {
544 Ok(())
545 }
546 }
547
548 fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()> {
549 let error = unsafe {
550 rdsys::rd_kafka_commit(self.client.native_ptr(), ptr::null_mut(), mode as i32)
551 };
552 if error.is_error() {
553 Err(KafkaError::ConsumerCommit(error.into()))
554 } else {
555 Ok(())
556 }
557 }
558
559 fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()> {
560 let error = unsafe {
561 rdsys::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32)
562 };
563 if error.is_error() {
564 Err(KafkaError::ConsumerCommit(error.into()))
565 } else {
566 Ok(())
567 }
568 }
569
570 fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
571 let topic = self.client.native_topic(topic)?;
572 let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) };
573 if error.is_error() {
574 Err(KafkaError::StoreOffset(error.into()))
575 } else {
576 Ok(())
577 }
578 }
579
580 fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
581 let error = unsafe {
582 rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
583 };
584 if error.is_error() {
585 Err(KafkaError::StoreOffset(error.into()))
586 } else {
587 Ok(())
588 }
589 }
590
591 fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
592 let error = unsafe { rdsys::rd_kafka_offsets_store(self.client.native_ptr(), tpl.ptr()) };
593 if error.is_error() {
594 Err(KafkaError::StoreOffset(error.into()))
595 } else {
596 Ok(())
597 }
598 }
599
600 fn subscription(&self) -> KafkaResult<TopicPartitionList> {
601 let mut tpl_ptr = ptr::null_mut();
602 let error = unsafe { rdsys::rd_kafka_subscription(self.client.native_ptr(), &mut tpl_ptr) };
603
604 if error.is_error() {
605 Err(KafkaError::MetadataFetch(error.into()))
606 } else {
607 Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
608 }
609 }
610
611 fn assignment(&self) -> KafkaResult<TopicPartitionList> {
612 let mut tpl_ptr = ptr::null_mut();
613 let error = unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
614
615 if error.is_error() {
616 Err(KafkaError::MetadataFetch(error.into()))
617 } else {
618 Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
619 }
620 }
621
622 fn assignment_lost(&self) -> bool {
623 unsafe { rdsys::rd_kafka_assignment_lost(self.client.native_ptr()) == 1 }
624 }
625
626 fn committed<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<TopicPartitionList> {
627 let mut tpl_ptr = ptr::null_mut();
628 let assignment_error =
629 unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
630 if assignment_error.is_error() {
631 return Err(KafkaError::MetadataFetch(assignment_error.into()));
632 }
633
634 self.committed_offsets(unsafe { TopicPartitionList::from_ptr(tpl_ptr) }, timeout)
635 }
636
637 fn committed_offsets<T: Into<Timeout>>(
638 &self,
639 tpl: TopicPartitionList,
640 timeout: T,
641 ) -> KafkaResult<TopicPartitionList> {
642 let committed_error = unsafe {
643 rdsys::rd_kafka_committed(
644 self.client.native_ptr(),
645 tpl.ptr(),
646 timeout.into().as_millis(),
647 )
648 };
649
650 if committed_error.is_error() {
651 Err(KafkaError::MetadataFetch(committed_error.into()))
652 } else {
653 Ok(tpl)
654 }
655 }
656
657 fn offsets_for_timestamp<T: Into<Timeout>>(
658 &self,
659 timestamp: i64,
660 timeout: T,
661 ) -> KafkaResult<TopicPartitionList> {
662 let mut tpl_ptr = ptr::null_mut();
663 let assignment_error =
664 unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
665 if assignment_error.is_error() {
666 return Err(KafkaError::MetadataFetch(assignment_error.into()));
667 }
668 let mut tpl = unsafe { TopicPartitionList::from_ptr(tpl_ptr) };
669
670 tpl.set_all_offsets(Offset::Offset(timestamp))?;
673
674 self.offsets_for_times(tpl, timeout)
675 }
676
677 fn offsets_for_times<T: Into<Timeout>>(
680 &self,
681 timestamps: TopicPartitionList,
682 timeout: T,
683 ) -> KafkaResult<TopicPartitionList> {
684 let offsets_for_times_error = unsafe {
687 rdsys::rd_kafka_offsets_for_times(
688 self.client.native_ptr(),
689 timestamps.ptr(),
690 timeout.into().as_millis(),
691 )
692 };
693
694 if offsets_for_times_error.is_error() {
695 Err(KafkaError::MetadataFetch(offsets_for_times_error.into()))
696 } else {
697 Ok(timestamps)
698 }
699 }
700
701 fn position(&self) -> KafkaResult<TopicPartitionList> {
702 let tpl = self.assignment()?;
703 let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) };
704 if error.is_error() {
705 Err(KafkaError::MetadataFetch(error.into()))
706 } else {
707 Ok(tpl)
708 }
709 }
710
711 fn fetch_metadata<T: Into<Timeout>>(
712 &self,
713 topic: Option<&str>,
714 timeout: T,
715 ) -> KafkaResult<Metadata> {
716 self.client.fetch_metadata(topic, timeout)
717 }
718
719 fn fetch_watermarks<T: Into<Timeout>>(
720 &self,
721 topic: &str,
722 partition: i32,
723 timeout: T,
724 ) -> KafkaResult<(i64, i64)> {
725 self.client.fetch_watermarks(topic, partition, timeout)
726 }
727
728 fn fetch_group_list<T: Into<Timeout>>(
729 &self,
730 group: Option<&str>,
731 timeout: T,
732 ) -> KafkaResult<GroupList> {
733 self.client.fetch_group_list(group, timeout)
734 }
735
736 fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
737 let ret_code =
738 unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
739 if ret_code.is_error() {
740 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
741 return Err(KafkaError::PauseResume(error));
742 };
743 Ok(())
744 }
745
746 fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
747 let ret_code = unsafe {
748 rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
749 };
750 if ret_code.is_error() {
751 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
752 return Err(KafkaError::PauseResume(error));
753 };
754 Ok(())
755 }
756
757 fn rebalance_protocol(&self) -> RebalanceProtocol {
758 self.client.native_client().rebalance_protocol()
759 }
760}
761
762impl<C> Drop for BaseConsumer<C>
763where
764 C: ConsumerContext,
765{
766 fn drop(&mut self) {
767 unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
768
769 trace!("Destroying consumer: {:?}", self.client.native_ptr());
770 if self.group_id.is_some() {
771 if let Err(err) = self.close_queue() {
772 error!("Failed to close consumer queue on drop: {}", err);
773 } else {
774 while !self.closed() {
775 self.poll(Duration::from_millis(100));
776 }
777 }
778 }
779 trace!("Consumer destroyed: {:?}", self.client.native_ptr());
780 }
781}
782
783pub struct Iter<'a, C>(&'a BaseConsumer<C>)
788where
789 C: ConsumerContext;
790
791impl<'a, C> Iterator for Iter<'a, C>
792where
793 C: ConsumerContext,
794{
795 type Item = KafkaResult<BorrowedMessage<'a>>;
796
797 fn next(&mut self) -> Option<Self::Item> {
798 loop {
799 if let Some(item) = self.0.poll(None) {
800 return Some(item);
801 }
802 }
803 }
804}
805
806impl<'a, C> IntoIterator for &'a BaseConsumer<C>
807where
808 C: ConsumerContext,
809{
810 type Item = KafkaResult<BorrowedMessage<'a>>;
811 type IntoIter = Iter<'a, C>;
812
813 fn into_iter(self) -> Self::IntoIter {
814 self.iter()
815 }
816}
817
818pub struct PartitionQueue<C>
820where
821 C: ConsumerContext,
822{
823 consumer: Arc<BaseConsumer<C>>,
824 pub(crate) queue: NativeQueue,
825 nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
826}
827
828impl<C> PartitionQueue<C>
829where
830 C: ConsumerContext,
831{
832 pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
833 PartitionQueue {
834 consumer,
835 queue,
836 nonempty_callback: None,
837 }
838 }
839
840 pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
849 self.consumer.poll_queue(&self.queue, timeout).into()
850 }
851
852 pub fn set_nonempty_callback<F>(&mut self, f: F)
855 where
856 F: Fn() + Send + Sync + 'static,
857 {
858 unsafe extern "C" fn native_message_queue_nonempty_cb(
864 _: *mut RDKafka,
865 opaque_ptr: *mut c_void,
866 ) {
867 let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
868 (**f)();
869 }
870
871 let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
872 unsafe {
873 rdsys::rd_kafka_queue_cb_event_enable(
874 self.queue.ptr(),
875 Some(native_message_queue_nonempty_cb),
876 &*f as *const _ as *mut c_void,
877 )
878 }
879 self.nonempty_callback = Some(f);
880 }
881}
882
883impl<C> Drop for PartitionQueue<C>
884where
885 C: ConsumerContext,
886{
887 fn drop(&mut self) {
888 unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
889 }
890}