1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
3use std::fmt::Debug;
4use std::marker::PhantomData;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use chrono::{DateTime, Utc};
10use futures::channel::mpsc::unbounded;
11use futures::task::{Context, Poll};
12use futures::{
13 channel::{mpsc, oneshot},
14 future::{select, try_join_all, Either},
15 pin_mut, Future, FutureExt, SinkExt, Stream, StreamExt,
16};
17use regex::Regex;
18
19use crate::connection::Connection;
20use crate::error::{ConnectionError, ConsumerError, Error};
21use crate::executor::Executor;
22use crate::message::proto::CommandMessage;
23use crate::message::{
24 parse_batched_message,
25 proto::{self, command_subscribe::SubType, MessageIdData, MessageMetadata, Schema},
26 BatchedMessage, Message as RawMessage, Metadata, Payload,
27};
28use crate::proto::{BaseCommand, CommandCloseConsumer};
29use crate::reader::{Reader, State};
30use crate::{BrokerAddress, DeserializeMessage, Pulsar};
31use core::iter;
32use rand::distributions::Alphanumeric;
33use rand::Rng;
34use std::convert::TryFrom;
35use url::Url;
36
37#[derive(Clone, Default, Debug)]
39pub struct ConsumerOptions {
40 pub priority_level: Option<i32>,
41 pub durable: Option<bool>,
44 pub start_message_id: Option<MessageIdData>,
48 pub metadata: BTreeMap<String, String>,
50 pub read_compacted: Option<bool>,
51 pub schema: Option<Schema>,
52 pub initial_position: InitialPosition,
63}
64
65impl ConsumerOptions {
66 pub fn with_priority_level(mut self, priority_level: i32) -> Self {
68 self.priority_level = Some(priority_level);
69 self
70 }
71
72 pub fn durable(mut self, durable: bool) -> Self {
73 self.durable = Some(durable);
74 self
75 }
76
77 pub fn starting_on_message(mut self, message_id_data: MessageIdData) -> Self {
78 self.start_message_id = Some(message_id_data);
79 self
80 }
81
82 pub fn with_metadata(mut self, metadata: BTreeMap<String, String>) -> Self {
83 self.metadata = metadata;
84 self
85 }
86
87 pub fn read_compacted(mut self, read_compacted: bool) -> Self {
88 self.read_compacted = Some(read_compacted);
89 self
90 }
91
92 pub fn with_schema(mut self, schema: Schema) -> Self {
93 self.schema = Some(schema);
94 self
95 }
96
97 pub fn with_initial_position(mut self, initial_position: InitialPosition) -> Self {
98 self.initial_position = initial_position;
99 self
100 }
101}
102
103#[derive(Debug, Clone)]
104pub struct DeadLetterPolicy {
105 pub max_redeliver_count: usize,
107 pub dead_letter_topic: String,
109}
110
111#[derive(Clone, Debug)]
113pub enum InitialPosition {
114 Earliest,
116 Latest,
118}
119
120impl Default for InitialPosition {
121 fn default() -> Self {
122 InitialPosition::Latest
123 }
124}
125impl From<InitialPosition> for i32 {
126 fn from(i: InitialPosition) -> Self {
127 match i {
128 InitialPosition::Earliest => 1,
129 InitialPosition::Latest => 0,
130 }
131 }
132}
133
134pub struct Consumer<T: DeserializeMessage, Exe: Executor> {
169 inner: InnerConsumer<T, Exe>,
170}
171impl<T: DeserializeMessage, Exe: Executor> Consumer<T, Exe> {
172 pub fn builder(pulsar: &Pulsar<Exe>) -> ConsumerBuilder<Exe> {
174 ConsumerBuilder::new(pulsar)
175 }
176
177 pub async fn check_connection(&mut self) -> Result<(), Error> {
179 match &mut self.inner {
180 InnerConsumer::Single(c) => c.check_connection().await,
181 InnerConsumer::Multi(c) => c.check_connections().await,
182 }
183 }
184
185 pub async fn ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
187 match &mut self.inner {
188 InnerConsumer::Single(c) => c.ack(msg).await,
189 InnerConsumer::Multi(c) => c.ack(msg).await,
190 }
191 }
192
193 pub async fn cumulative_ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
195 match &mut self.inner {
196 InnerConsumer::Single(c) => c.cumulative_ack(msg).await,
197 InnerConsumer::Multi(c) => c.cumulative_ack(msg).await,
198 }
199 }
200
201 pub async fn nack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
205 match &mut self.inner {
206 InnerConsumer::Single(c) => c.nack(msg).await,
207 InnerConsumer::Multi(c) => c.nack(msg).await,
208 }
209 }
210
211 pub async fn seek(
215 &mut self,
216 consumer_ids: Option<Vec<String>>,
217 message_id: Option<MessageIdData>,
218 timestamp: Option<u64>,
219 client: Pulsar<Exe>,
220 ) -> Result<(), Error> {
221 let inner_consumer: InnerConsumer<T, Exe> = match &mut self.inner {
222 InnerConsumer::Single(c) => {
223 c.seek(message_id, timestamp).await?;
224 let topic = c.topic().to_string();
225 let addr = client.lookup_topic(&topic).await?;
226 let config = c.config().clone();
227 InnerConsumer::Single(TopicConsumer::new(client, topic, addr, config).await?)
228 }
229 InnerConsumer::Multi(c) => {
230 c.seek(consumer_ids, message_id, timestamp).await?;
231 let topics = c.topics();
232
233 let addrs =
235 try_join_all(topics.into_iter().map(|topic| client.lookup_topic(topic)))
236 .await?;
237
238 let topic_addr_pair = c.topics.iter().cloned().zip(addrs.iter().cloned());
239
240 let consumers = try_join_all(topic_addr_pair.map(|(topic, addr)| {
241 TopicConsumer::new(client.clone(), topic, addr, c.config().clone())
242 }))
243 .await?;
244
245 let consumers: BTreeMap<_, _> = consumers
246 .into_iter()
247 .map(|c| (c.topic(), Box::pin(c)))
248 .collect();
249 let topics = consumers.keys().cloned().collect();
250 let topic_refresh = Duration::from_secs(30);
251 let refresh = Box::pin(client.executor.interval(topic_refresh).map(drop));
252 let namespace = c.namespace.clone();
253 let config = c.config().clone();
254 let topic_regex = c.topic_regex.clone();
255 InnerConsumer::Multi(MultiTopicConsumer {
256 namespace,
257 topic_regex,
258 pulsar: client,
259 consumers,
260 topics,
261 new_consumers: None,
262 refresh,
263 config,
264 disc_last_message_received: None,
265 disc_messages_received: 0,
266 })
267 }
268 };
269
270 self.inner = inner_consumer;
271 Ok(())
272 }
273
274 pub async fn unsubscribe(&mut self) -> Result<(), Error> {
275 match &mut self.inner {
276 InnerConsumer::Single(c) => c.unsubscribe().await,
277 InnerConsumer::Multi(c) => c.unsubscribe().await,
278 }
279 }
280
281 pub fn topics(&self) -> Vec<String> {
283 match &self.inner {
284 InnerConsumer::Single(c) => vec![c.topic()],
285 InnerConsumer::Multi(c) => c.topics(),
286 }
287 }
288
289 pub async fn connections(&mut self) -> Result<Vec<Url>, Error> {
291 match &mut self.inner {
292 InnerConsumer::Single(c) => Ok(vec![c.connection().await?.url().clone()]),
293 InnerConsumer::Multi(c) => {
294 let v = c
295 .consumers
296 .values_mut()
297 .map(|c| c.connection())
298 .collect::<Vec<_>>();
299
300 let mut connections = try_join_all(v).await?;
301 Ok(connections
302 .drain(..)
303 .map(|conn| conn.url().clone())
304 .collect::<BTreeSet<_>>()
305 .into_iter()
306 .collect())
307 }
308 }
309 }
310
311 pub fn options(&self) -> &ConsumerOptions {
313 match &self.inner {
314 InnerConsumer::Single(c) => &c.config.options,
315 InnerConsumer::Multi(c) => &c.config.options,
316 }
317 }
318
319 pub fn dead_letter_policy(&self) -> Option<&DeadLetterPolicy> {
321 match &self.inner {
322 InnerConsumer::Single(c) => c.dead_letter_policy.as_ref(),
323 InnerConsumer::Multi(c) => c.config.dead_letter_policy.as_ref(),
324 }
325 }
326
327 pub fn subscription(&self) -> &str {
329 match &self.inner {
330 InnerConsumer::Single(c) => &c.config.subscription,
331 InnerConsumer::Multi(c) => &c.config.subscription,
332 }
333 }
334
335 pub fn sub_type(&self) -> SubType {
337 match &self.inner {
338 InnerConsumer::Single(c) => c.config.sub_type,
339 InnerConsumer::Multi(c) => c.config.sub_type,
340 }
341 }
342
343 pub fn batch_size(&self) -> Option<u32> {
345 match &self.inner {
346 InnerConsumer::Single(c) => c.config.batch_size,
347 InnerConsumer::Multi(c) => c.config.batch_size,
348 }
349 }
350
351 pub fn consumer_name(&self) -> Option<&str> {
353 match &self.inner {
354 InnerConsumer::Single(c) => &c.config.consumer_name,
355 InnerConsumer::Multi(c) => &c.config.consumer_name,
356 }
357 .as_ref()
358 .map(|s| s.as_str())
359 }
360
361 pub fn consumer_id(&self) -> Vec<u64> {
363 match &self.inner {
364 InnerConsumer::Single(c) => vec![c.consumer_id],
365 InnerConsumer::Multi(c) => c.consumers.values().map(|c| c.consumer_id).collect(),
366 }
367 }
368
369 pub fn unacked_message_redelivery_delay(&self) -> Option<Duration> {
374 match &self.inner {
375 InnerConsumer::Single(c) => c.config.unacked_message_redelivery_delay,
376 InnerConsumer::Multi(c) => c.config.unacked_message_redelivery_delay,
377 }
378 }
379
380 pub fn last_message_received(&self) -> Option<DateTime<Utc>> {
382 match &self.inner {
383 InnerConsumer::Single(c) => c.last_message_received(),
384 InnerConsumer::Multi(c) => c.last_message_received(),
385 }
386 }
387
388 pub fn messages_received(&self) -> u64 {
390 match &self.inner {
391 InnerConsumer::Single(c) => c.messages_received(),
392 InnerConsumer::Multi(c) => c.messages_received(),
393 }
394 }
395}
396
397impl<T: DeserializeMessage, Exe: Executor> Stream for Consumer<T, Exe> {
398 type Item = Result<Message<T>, Error>;
399
400 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
401 match &mut self.inner {
402 InnerConsumer::Single(c) => Pin::new(c).poll_next(cx),
403 InnerConsumer::Multi(c) => Pin::new(c).poll_next(cx),
404 }
405 }
406}
407
408enum InnerConsumer<T: DeserializeMessage, Exe: Executor> {
409 Single(TopicConsumer<T, Exe>),
410 Multi(MultiTopicConsumer<T, Exe>),
411}
412
413type MessageIdDataReceiver = mpsc::Receiver<Result<(proto::MessageIdData, Payload), Error>>;
414
415pub(crate) struct TopicConsumer<T: DeserializeMessage, Exe: Executor> {
417 pub(crate) consumer_id: u64,
418 pub(crate) config: ConsumerConfig,
419 topic: String,
420 messages: Pin<Box<MessageIdDataReceiver>>,
421 engine_tx: mpsc::UnboundedSender<EngineMessage<Exe>>,
422 #[allow(unused)]
423 data_type: PhantomData<fn(Payload) -> T::Output>,
424 pub(crate) dead_letter_policy: Option<DeadLetterPolicy>,
425 last_message_received: Option<DateTime<Utc>>,
426 messages_received: u64,
427}
428
429impl<T: DeserializeMessage, Exe: Executor> TopicConsumer<T, Exe> {
430 async fn new(
431 client: Pulsar<Exe>,
432 topic: String,
433 mut addr: BrokerAddress,
434 config: ConsumerConfig,
435 ) -> Result<TopicConsumer<T, Exe>, Error> {
436 let ConsumerConfig {
437 subscription,
438 sub_type,
439 batch_size,
440 consumer_name,
441 consumer_id,
442 unacked_message_redelivery_delay,
443 options,
444 dead_letter_policy,
445 } = config.clone();
446 let consumer_id = consumer_id.unwrap_or_else(rand::random);
447 let (resolver, messages) = mpsc::unbounded();
448 let batch_size = batch_size.unwrap_or(1000);
449
450 let mut connection = client.manager.get_connection(&addr).await?;
451 let mut current_retries = 0u32;
452 let start = std::time::Instant::now();
453 let operation_retry_options = client.operation_retry_options.clone();
454
455 loop {
456 match connection
457 .sender()
458 .subscribe(
459 resolver.clone(),
460 topic.clone(),
461 subscription.clone(),
462 sub_type,
463 consumer_id,
464 consumer_name.clone(),
465 options.clone(),
466 )
467 .await
468 {
469 Ok(_) => {
470 if current_retries > 0 {
471 let dur = (std::time::Instant::now() - start).as_secs();
472 log::info!(
473 "subscribe({}) success after {} retries over {} seconds",
474 topic,
475 current_retries + 1,
476 dur
477 );
478 }
479 break;
480 }
481 Err(ConnectionError::PulsarError(
482 Some(proto::ServerError::ServiceNotReady),
483 text,
484 )) => {
485 if operation_retry_options.max_retries.is_none()
486 || operation_retry_options.max_retries.unwrap() > current_retries
487 {
488 error!("subscribe({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}",
489 topic, operation_retry_options.retry_delay.as_millis(),
490 operation_retry_options.max_retries, text.unwrap_or_else(String::new));
491
492 current_retries += 1;
493 client
494 .executor
495 .delay(operation_retry_options.retry_delay)
496 .await;
497
498 let prev = addr;
500 addr = client.lookup_topic(&topic).await?;
501 if prev != addr {
502 info!(
503 "topic {} moved: previous = {:?}, new = {:?}",
504 topic, prev, addr
505 );
506 }
507
508 connection = client.manager.get_connection(&addr).await?;
509 continue;
510 } else {
511 error!("subscribe({}) reached max retries", topic);
512
513 return Err(ConnectionError::PulsarError(
514 Some(proto::ServerError::ServiceNotReady),
515 text,
516 )
517 .into());
518 }
519 }
520 Err(e) => return Err(Error::Connection(e)),
521 }
522 }
523
524 connection
525 .sender()
526 .send_flow(consumer_id, batch_size)
527 .map_err(|e| {
528 error!("TopicConsumer::new error[{}]: {:?}", line!(), e);
529 e
530 })
531 .map_err(|e| Error::Consumer(ConsumerError::Connection(e)))?;
532
533 let (engine_tx, engine_rx) = unbounded();
534 let (_drop_signal, drop_receiver) = oneshot::channel::<()>();
537 let conn = connection.clone();
538 let name = consumer_name.clone();
540 let topic_name = topic.clone();
541 let _ = client.executor.spawn(Box::pin(async move {
542 let _res = drop_receiver.await;
543 if _res.is_err() {
545 if let Err(e) = conn.sender().close_consumer(consumer_id).await {
546 error!(
547 "could not close consumer {:?}({}) for topic {}: {:?}",
548 consumer_name, consumer_id, topic_name, e
549 );
550 }
551 }
552 }));
553
554 if unacked_message_redelivery_delay.is_some() {
555 let mut redelivery_tx = engine_tx.clone();
556 let mut interval = client.executor.interval(Duration::from_millis(500));
557 let res = client.executor.spawn(Box::pin(async move {
558 while interval.next().await.is_some() {
559 if redelivery_tx
560 .send(EngineMessage::UnackedRedelivery)
561 .await
562 .is_err()
563 {
564 break;
566 }
567 }
568 }));
569 if res.is_err() {
570 return Err(Error::Executor);
571 }
572 }
573 let (tx, rx) = mpsc::channel(1000);
574 let mut c = ConsumerEngine::new(
575 client.clone(),
576 connection.clone(),
577 topic.clone(),
578 subscription.clone(),
579 sub_type,
580 consumer_id,
581 name,
582 tx,
583 messages,
584 engine_rx,
585 batch_size,
586 unacked_message_redelivery_delay,
587 dead_letter_policy.clone(),
588 options.clone(),
589 _drop_signal,
590 );
591 let f = async move {
592 c.engine()
593 .map(|res| {
594 debug!("consumer engine stopped: {:?}", res);
595 })
596 .await;
597 };
598 if client.executor.spawn(Box::pin(f)).is_err() {
599 return Err(Error::Executor);
600 }
601
602 Ok(TopicConsumer {
603 consumer_id,
604 config,
605 topic,
606 messages: Box::pin(rx),
607 engine_tx,
608 data_type: PhantomData,
609 dead_letter_policy,
610 last_message_received: None,
611 messages_received: 0,
612 })
613 }
614
615 pub fn topic(&self) -> String {
616 self.topic.clone()
617 }
618
619 pub async fn connection(&mut self) -> Result<Arc<Connection<Exe>>, Error> {
620 let (resolver, response) = oneshot::channel();
621 self.engine_tx
622 .send(EngineMessage::GetConnection(resolver))
623 .await
624 .map_err(|_| ConsumerError::Connection(ConnectionError::Disconnected))?;
625
626 response.await.map_err(|oneshot::Canceled| {
627 error!("the consumer engine dropped the request");
628 ConnectionError::Disconnected.into()
629 })
630 }
631
632 pub async fn check_connection(&mut self) -> Result<(), Error> {
633 let conn = self.connection().await?;
634 info!("check connection for id {}", conn.id());
635 conn.sender().send_ping().await?;
636 Ok(())
637 }
638
639 pub async fn ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
640 self.engine_tx
641 .send(EngineMessage::Ack(msg.message_id.clone(), false))
642 .await?;
643 Ok(())
644 }
645
646 pub(crate) fn acker(&self) -> mpsc::UnboundedSender<EngineMessage<Exe>> {
647 self.engine_tx.clone()
648 }
649
650 async fn cumulative_ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
651 self.engine_tx
652 .send(EngineMessage::Ack(msg.message_id.clone(), true))
653 .await?;
654 Ok(())
655 }
656
657 async fn nack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
658 self.engine_tx
659 .send(EngineMessage::Nack(msg.message_id.clone()))
660 .await?;
661 Ok(())
662 }
663
664 pub async fn seek(
665 &mut self,
666 message_id: Option<MessageIdData>,
667 timestamp: Option<u64>,
668 ) -> Result<(), Error> {
669 let consumer_id = self.consumer_id;
670 self.connection()
671 .await?
672 .sender()
673 .seek(consumer_id, message_id, timestamp)
674 .await?;
675 Ok(())
676 }
677
678 pub async fn unsubscribe(&mut self) -> Result<(), Error> {
679 let consumer_id = self.consumer_id;
680 self.connection()
681 .await?
682 .sender()
683 .unsubscribe(consumer_id)
684 .await?;
685 Ok(())
686 }
687
688 pub fn last_message_received(&self) -> Option<DateTime<Utc>> {
689 self.last_message_received
690 }
691
692 pub fn messages_received(&self) -> u64 {
693 self.messages_received
694 }
695
696 fn config(&self) -> &ConsumerConfig {
697 &self.config
698 }
699
700 fn create_message(&self, message_id: proto::MessageIdData, payload: Payload) -> Message<T> {
701 Message {
702 topic: self.topic.clone(),
703 message_id: MessageData {
704 id: message_id,
705 batch_size: payload.metadata.num_messages_in_batch,
706 },
707 payload,
708 _phantom: PhantomData,
709 }
710 }
711}
712
713impl<T: DeserializeMessage, Exe: Executor> Stream for TopicConsumer<T, Exe> {
714 type Item = Result<Message<T>, Error>;
715
716 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
717 match self.messages.as_mut().poll_next(cx) {
718 Poll::Pending => Poll::Pending,
719 Poll::Ready(None) => Poll::Ready(None),
720 Poll::Ready(Some(Ok((id, payload)))) => {
721 self.last_message_received = Some(Utc::now());
722 self.messages_received += 1;
723 Poll::Ready(Some(Ok(self.create_message(id, payload))))
724 }
725 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
726 }
727 }
728}
729
730struct ConsumerEngine<Exe: Executor> {
731 client: Pulsar<Exe>,
732 connection: Arc<Connection<Exe>>,
733 topic: String,
734 subscription: String,
735 sub_type: SubType,
736 id: u64,
737 name: Option<String>,
738 tx: mpsc::Sender<Result<(proto::MessageIdData, Payload), Error>>,
739 messages_rx: Option<mpsc::UnboundedReceiver<RawMessage>>,
740 engine_rx: Option<mpsc::UnboundedReceiver<EngineMessage<Exe>>>,
741 batch_size: u32,
742 remaining_messages: u32,
743 unacked_message_redelivery_delay: Option<Duration>,
744 unacked_messages: HashMap<MessageIdData, Instant>,
745 dead_letter_policy: Option<DeadLetterPolicy>,
746 options: ConsumerOptions,
747 _drop_signal: oneshot::Sender<()>,
748}
749
750pub(crate) enum EngineMessage<Exe: Executor> {
751 Ack(MessageData, bool),
752 Nack(MessageData),
753 UnackedRedelivery,
754 GetConnection(oneshot::Sender<Arc<Connection<Exe>>>),
755}
756
757impl<Exe: Executor> ConsumerEngine<Exe> {
758 fn new(
759 client: Pulsar<Exe>,
760 connection: Arc<Connection<Exe>>,
761 topic: String,
762 subscription: String,
763 sub_type: SubType,
764 id: u64,
765 name: Option<String>,
766 tx: mpsc::Sender<Result<(proto::MessageIdData, Payload), Error>>,
767 messages_rx: mpsc::UnboundedReceiver<RawMessage>,
768 engine_rx: mpsc::UnboundedReceiver<EngineMessage<Exe>>,
769 batch_size: u32,
770 unacked_message_redelivery_delay: Option<Duration>,
771 dead_letter_policy: Option<DeadLetterPolicy>,
772 options: ConsumerOptions,
773 _drop_signal: oneshot::Sender<()>,
774 ) -> ConsumerEngine<Exe> {
775 ConsumerEngine {
776 client,
777 connection,
778 topic,
779 subscription,
780 sub_type,
781 id,
782 name,
783 tx,
784 messages_rx: Some(messages_rx),
785 engine_rx: Some(engine_rx),
786 batch_size,
787 remaining_messages: batch_size,
788 unacked_message_redelivery_delay,
789 unacked_messages: HashMap::new(),
790 dead_letter_policy,
791 options,
792 _drop_signal,
793 }
794 }
795
796 async fn engine(&mut self) -> Result<(), Error> {
797 debug!("starting the consumer engine for topic {}", self.topic);
798 let mut messages_or_ack_f = None;
799 loop {
800 if !self.connection.is_valid() {
801 if let Some(err) = self.connection.error() {
802 error!(
803 "Consumer: connection {} is not valid: {:?}",
804 self.connection.id(),
805 err
806 );
807 self.reconnect().await?;
808 }
809 }
810
811 if self.remaining_messages < self.batch_size / 2 {
812 match self
813 .connection
814 .sender()
815 .send_flow(self.id, self.batch_size - self.remaining_messages)
816 {
817 Ok(()) => {}
818 Err(ConnectionError::Disconnected) => {
819 self.reconnect().await?;
820 self.connection
821 .sender()
822 .send_flow(self.id, self.batch_size - self.remaining_messages)?;
823 }
824 Err(e) => return Err(e.into()),
825 }
826 self.remaining_messages = self.batch_size;
827 }
828
829 let mut f = match messages_or_ack_f.take() {
830 None => {
831 let messages_f = self.messages_rx.take().unwrap().into_future();
837 let ack_f = self.engine_rx.take().unwrap().into_future();
838 select(messages_f, ack_f)
839 }
840 Some(f) => f,
841 };
842
843 let delay_f = self.client.executor.delay(Duration::from_secs(1));
848 let f_pin = std::pin::Pin::new(&mut f);
849 pin_mut!(delay_f);
850
851 let f = match select(f_pin, delay_f).await {
852 Either::Left((res, _)) => res,
853 Either::Right((_, _f)) => {
854 messages_or_ack_f = Some(f);
855 continue;
856 }
857 };
858
859 match f {
860 Either::Left(((message_opt, messages_rx), engine_rx)) => {
861 self.messages_rx = Some(messages_rx);
862 self.engine_rx = engine_rx.into_inner();
863 match message_opt {
864 None => {
865 error!("Consumer: messages::next: returning Disconnected");
866 self.reconnect().await?;
867 continue;
868 }
870 Some(message) => {
871 self.remaining_messages -= message
872 .payload
873 .as_ref()
874 .and_then(|payload| payload.metadata.num_messages_in_batch)
875 .unwrap_or(1i32)
876 as u32;
877
878 match self.process_message(message).await {
879 Ok(true) => {}
881 Ok(false) => {
883 return Ok(());
884 }
885 Err(e) => {
886 if let Err(e) = self.tx.send(Err(e)).await {
887 error!("cannot send a message from the consumer engine to the consumer({}), stopping the engine", self.id);
888 return Err(Error::Consumer(e.into()));
889 }
890 }
891 }
892 }
893 }
894 }
895 Either::Right(((ack_opt, engine_rx), messages_rx)) => {
896 self.messages_rx = messages_rx.into_inner();
897 self.engine_rx = Some(engine_rx);
898
899 match ack_opt {
900 None => {
901 trace!("ack channel was closed");
902 return Ok(());
903 }
904 Some(EngineMessage::Ack(message_id, cumulative)) => {
905 self.ack(message_id, cumulative);
906 }
907 Some(EngineMessage::Nack(message_id)) => {
908 if let Err(e) = self
909 .connection
910 .sender()
911 .send_redeliver_unacknowleged_messages(
912 self.id,
913 vec![message_id.id.clone()],
914 )
915 {
916 error!(
917 "could not ask for redelivery for message {:?}: {:?}",
918 message_id, e
919 );
920 }
921 }
922 Some(EngineMessage::UnackedRedelivery) => {
923 let mut h = HashSet::new();
924 let now = Instant::now();
925 for (id, t) in self.unacked_messages.iter() {
927 if *t < now {
928 h.insert(id.clone());
929 }
930 }
931
932 let ids: Vec<_> = h.iter().cloned().collect();
933 if !ids.is_empty() {
934 if let Err(e) = self
936 .connection
937 .sender()
938 .send_redeliver_unacknowleged_messages(self.id, ids)
939 {
940 error!("could not ask for redelivery: {:?}", e);
941 } else {
942 for i in h.iter() {
943 self.unacked_messages.remove(i);
944 }
945 }
946 }
947 }
948 Some(EngineMessage::GetConnection(sender)) => {
949 let _ = sender.send(self.connection.clone()).map_err(|_| {
950 error!("consumer requested the engine's connection but dropped the channel before receiving");
951 });
952 }
953 }
954 }
955 };
956 }
957 }
958
959 fn ack(&mut self, message_id: MessageData, cumulative: bool) {
960 self.unacked_messages.remove(&message_id.id);
962 let res = self
963 .connection
964 .sender()
965 .send_ack(self.id, vec![message_id.id], cumulative);
966 if res.is_err() {
967 error!("ack error: {:?}", res);
968 }
969 }
970
971 async fn process_message(&mut self, message: RawMessage) -> Result<bool, Error> {
973 match message {
974 RawMessage {
975 command:
976 BaseCommand {
977 reached_end_of_topic: Some(_),
978 ..
979 },
980 ..
981 } => {
982 return Ok(false);
983 }
984 RawMessage {
985 command:
986 BaseCommand {
987 active_consumer_change: Some(active_consumer_change),
988 ..
989 },
990 ..
991 } => {
992 debug!(
994 "Active consumer change for {} - Active: {:?}",
995 self.debug_format(),
996 active_consumer_change.is_active
997 );
998 }
999 RawMessage {
1000 command:
1001 BaseCommand {
1002 message: Some(message),
1003 ..
1004 },
1005 payload: Some(payload),
1006 } => {
1007 self.process_payload(message, payload).await?;
1008 }
1009 RawMessage {
1010 command: BaseCommand {
1011 message: Some(_), ..
1012 },
1013 payload: None,
1014 } => {
1015 error!(
1016 "Consumer {} received message without payload",
1017 self.debug_format()
1018 );
1019 }
1020 RawMessage {
1021 command:
1022 BaseCommand {
1023 close_consumer: Some(CommandCloseConsumer { consumer_id, .. }),
1024 ..
1025 },
1026 ..
1027 } => {
1028 error!(
1029 "Broker notification of closed consumer {}: {}",
1030 consumer_id,
1031 self.debug_format()
1032 );
1033
1034 self.reconnect().await?;
1035 }
1036 unexpected => {
1037 let r#type = proto::base_command::Type::try_from(unexpected.command.r#type)
1038 .map(|t| format!("{:?}", t))
1039 .unwrap_or_else(|_| unexpected.command.r#type.to_string());
1040 warn!(
1041 "Unexpected message type sent to consumer: {}. This is probably a bug!",
1042 r#type
1043 );
1044 }
1045 }
1046 Ok(true)
1047 }
1048
1049 async fn process_payload(
1050 &mut self,
1051 message: CommandMessage,
1052 mut payload: Payload,
1053 ) -> Result<(), Error> {
1054 let compression = payload.metadata.compression;
1055
1056 let payload = match compression {
1057 None | Some(0) => payload,
1058 Some(1) => {
1060 #[cfg(not(feature = "lz4"))]
1061 {
1062 return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1063 std::io::ErrorKind::Other,
1064 "got a LZ4 compressed message but 'lz4' cargo feature is deactivated",
1065 )))
1066 .into());
1067 }
1068
1069 #[cfg(feature = "lz4")]
1070 {
1071 let decompressed_payload = lz4::block::decompress(
1072 &payload.data[..],
1073 payload.metadata.uncompressed_size.map(|i| i as i32),
1074 )
1075 .map_err(ConsumerError::Io)?;
1076
1077 payload.data = decompressed_payload;
1078 payload
1079 }
1080 }
1081 Some(2) => {
1083 #[cfg(not(feature = "flate2"))]
1084 {
1085 return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1086 std::io::ErrorKind::Other,
1087 "got a zlib compressed message but 'flate2' cargo feature is deactivated",
1088 )))
1089 .into());
1090 }
1091
1092 #[cfg(feature = "flate2")]
1093 {
1094 use flate2::read::ZlibDecoder;
1095 use std::io::Read;
1096
1097 let mut d = ZlibDecoder::new(&payload.data[..]);
1098 let mut decompressed_payload = Vec::new();
1099 d.read_to_end(&mut decompressed_payload)
1100 .map_err(ConsumerError::Io)?;
1101
1102 payload.data = decompressed_payload;
1103 payload
1104 }
1105 }
1106 Some(3) => {
1108 #[cfg(not(feature = "zstd"))]
1109 {
1110 return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1111 std::io::ErrorKind::Other,
1112 "got a zstd compressed message but 'zstd' cargo feature is deactivated",
1113 )))
1114 .into());
1115 }
1116
1117 #[cfg(feature = "zstd")]
1118 {
1119 let decompressed_payload =
1120 zstd::decode_all(&payload.data[..]).map_err(ConsumerError::Io)?;
1121
1122 payload.data = decompressed_payload;
1123 payload
1124 }
1125 }
1126 Some(4) => {
1128 #[cfg(not(feature = "snap"))]
1129 {
1130 return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1131 std::io::ErrorKind::Other,
1132 "got a Snappy compressed message but 'snap' cargo feature is deactivated",
1133 )))
1134 .into());
1135 }
1136
1137 #[cfg(feature = "snap")]
1138 {
1139 use std::io::Read;
1140
1141 let mut decompressed_payload = Vec::new();
1142 let mut decoder = snap::read::FrameDecoder::new(&payload.data[..]);
1143 decoder
1144 .read_to_end(&mut decompressed_payload)
1145 .map_err(ConsumerError::Io)?;
1146
1147 payload.data = decompressed_payload;
1148 payload
1149 }
1150 }
1151 Some(i) => {
1152 error!("unknown compression type: {}", i);
1153 return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1154 std::io::ErrorKind::Other,
1155 format!("unknown compression type: {}", i),
1156 ))));
1157 }
1158 };
1159
1160 match payload.metadata.num_messages_in_batch {
1161 Some(_) => {
1162 let it = BatchedMessageIterator::new(message.message_id, payload)?;
1163 for (id, payload) in it {
1164 self.send_to_consumer(id, payload).await?;
1166 }
1167 }
1168 None => match (message.redelivery_count, self.dead_letter_policy.as_ref()) {
1169 (Some(redelivery_count), Some(dead_letter_policy)) => {
1170 if redelivery_count as usize >= dead_letter_policy.max_redeliver_count {
1172 self.client
1173 .send(&dead_letter_policy.dead_letter_topic, payload.data)
1174 .await?
1175 .await
1176 .map_err(|e| {
1177 error!("One shot cancelled {:?}", e);
1178 Error::Custom("DLQ send error".to_string())
1179 })?;
1180
1181 self.ack(
1182 MessageData {
1183 id: message.message_id,
1184 batch_size: None,
1185 },
1186 false,
1187 );
1188 } else {
1189 self.send_to_consumer(message.message_id, payload).await?
1190 }
1191 }
1192 _ => self.send_to_consumer(message.message_id, payload).await?,
1193 },
1194 }
1195 Ok(())
1196 }
1197
1198 async fn send_to_consumer(
1199 &mut self,
1200 message_id: MessageIdData,
1201 payload: Payload,
1202 ) -> Result<(), Error> {
1203 let now = Instant::now();
1204 self.tx
1205 .send(Ok((message_id.clone(), payload)))
1206 .await
1207 .map_err(|e| {
1208 error!("tx returned {:?}", e);
1209 Error::Custom("tx closed".to_string())
1210 })?;
1211 if let Some(duration) = self.unacked_message_redelivery_delay {
1212 self.unacked_messages.insert(message_id, now + duration);
1213 }
1214 Ok(())
1215 }
1216
1217 async fn reconnect(&mut self) -> Result<(), Error> {
1218 debug!("reconnecting consumer for topic: {}", self.topic);
1219 let broker_address = self.client.lookup_topic(&self.topic).await?;
1220 let conn = self.client.manager.get_connection(&broker_address).await?;
1221
1222 self.connection = conn;
1223
1224 let topic = self.topic.clone();
1225 let (resolver, messages) = mpsc::unbounded();
1226
1227 self.connection
1228 .sender()
1229 .subscribe(
1230 resolver,
1231 topic.clone(),
1232 self.subscription.clone(),
1233 self.sub_type,
1234 self.id,
1235 self.name.clone(),
1236 self.options.clone(),
1237 )
1238 .await
1239 .map_err(Error::Connection)?;
1240
1241 self.connection
1242 .sender()
1243 .send_flow(self.id, self.batch_size)
1244 .map_err(|e| Error::Consumer(ConsumerError::Connection(e)))?;
1245
1246 self.messages_rx = Some(messages);
1247
1248 let (_drop_signal, drop_receiver) = oneshot::channel::<()>();
1251 let conn = self.connection.clone();
1252 let name = self.name.clone();
1253 let id = self.id;
1254 let topic = self.topic.clone();
1255 let _ = self.client.executor.spawn(Box::pin(async move {
1256 let _res = drop_receiver.await;
1257 if _res.is_err() {
1259 if let Err(e) = conn.sender().close_consumer(id).await {
1260 error!(
1261 "could not close consumer {:?}({}) for topic {}: {:?}",
1262 name, id, topic, e
1263 );
1264 }
1265 }
1266 }));
1267 let old_signal = std::mem::replace(&mut self._drop_signal, _drop_signal);
1268 if let Err(e) = old_signal.send(()) {
1269 error!(
1270 "could not send the drop signal to the old consumer(id={}): {:?}",
1271 id, e
1272 );
1273 }
1274
1275 Ok(())
1276 }
1277
1278 fn debug_format(&self) -> String {
1279 format!(
1280 "[{id} - {subscription}{name}: {topic}]",
1281 id = self.id,
1282 subscription = &self.subscription,
1283 name = self
1284 .name
1285 .as_ref()
1286 .map(|s| format!("({})", s))
1287 .unwrap_or_default(),
1288 topic = &self.topic
1289 )
1290 }
1291}
1292
1293#[derive(Clone, Debug, PartialEq)]
1294pub struct MessageData {
1295 pub id: proto::MessageIdData,
1296 batch_size: Option<i32>,
1297}
1298
1299struct BatchedMessageIterator {
1300 messages: std::vec::IntoIter<BatchedMessage>,
1301 message_id: proto::MessageIdData,
1302 metadata: Metadata,
1303 total_messages: u32,
1304 current_index: u32,
1305}
1306
1307impl BatchedMessageIterator {
1308 fn new(message_id: proto::MessageIdData, payload: Payload) -> Result<Self, ConnectionError> {
1309 let total_messages = payload
1310 .metadata
1311 .num_messages_in_batch
1312 .expect("expected batched message") as u32;
1313 let messages = parse_batched_message(total_messages, &payload.data)?;
1314
1315 Ok(Self {
1316 messages: messages.into_iter(),
1317 message_id,
1318 total_messages,
1319 metadata: payload.metadata,
1320 current_index: 0,
1321 })
1322 }
1323}
1324
1325impl Iterator for BatchedMessageIterator {
1326 type Item = (proto::MessageIdData, Payload);
1327
1328 fn next(&mut self) -> Option<Self::Item> {
1329 let remaining = self.total_messages - self.current_index;
1330 if remaining == 0 {
1331 return None;
1332 }
1333 let index = self.current_index;
1334 self.current_index += 1;
1335 if let Some(batched_message) = self.messages.next() {
1336 let id = proto::MessageIdData {
1337 batch_index: Some(index as i32),
1338 ..self.message_id.clone()
1339 };
1340
1341 let metadata = Metadata {
1342 properties: batched_message.metadata.properties,
1343 partition_key: batched_message.metadata.partition_key,
1344 event_time: batched_message.metadata.event_time,
1345 ..self.metadata.clone()
1346 };
1347
1348 let payload = Payload {
1349 metadata,
1350 data: batched_message.payload,
1351 };
1352
1353 Some((id, payload))
1354 } else {
1355 None
1356 }
1357 }
1358}
1359
1360#[derive(Clone)]
1364pub struct ConsumerBuilder<Exe: Executor> {
1365 pulsar: Pulsar<Exe>,
1366 topics: Option<Vec<String>>,
1367 topic_regex: Option<Regex>,
1368 subscription: Option<String>,
1369 subscription_type: Option<SubType>,
1370 consumer_id: Option<u64>,
1371 consumer_name: Option<String>,
1372 batch_size: Option<u32>,
1373 unacked_message_resend_delay: Option<Duration>,
1374 dead_letter_policy: Option<DeadLetterPolicy>,
1375 consumer_options: Option<ConsumerOptions>,
1376 namespace: Option<String>,
1377 topic_refresh: Option<Duration>,
1378}
1379
1380impl<Exe: Executor> ConsumerBuilder<Exe> {
1381 pub fn new(pulsar: &Pulsar<Exe>) -> Self {
1383 ConsumerBuilder {
1384 pulsar: pulsar.clone(),
1385 topics: None,
1386 topic_regex: None,
1387 subscription: None,
1388 subscription_type: None,
1389 consumer_id: None,
1390 consumer_name: None,
1391 batch_size: None,
1392 unacked_message_resend_delay: None,
1394 dead_letter_policy: None,
1395 consumer_options: None,
1396 namespace: None,
1397 topic_refresh: None,
1398 }
1399 }
1400
1401 pub fn with_topic<S: Into<String>>(mut self, topic: S) -> ConsumerBuilder<Exe> {
1403 match &mut self.topics {
1404 Some(topics) => topics.push(topic.into()),
1405 None => self.topics = Some(vec![topic.into()]),
1406 }
1407 self
1408 }
1409
1410 pub fn with_topics<S: AsRef<str>, I: IntoIterator<Item = S>>(
1412 mut self,
1413 topics: I,
1414 ) -> ConsumerBuilder<Exe> {
1415 let new_topics = topics.into_iter().map(|t| t.as_ref().into());
1416 match &mut self.topics {
1417 Some(topics) => {
1418 topics.extend(new_topics);
1419 }
1420 None => self.topics = Some(new_topics.collect()),
1421 }
1422 self
1423 }
1424
1425 pub fn with_topic_regex(mut self, regex: Regex) -> ConsumerBuilder<Exe> {
1428 self.topic_regex = Some(regex);
1429 self
1430 }
1431
1432 pub fn with_subscription<S: Into<String>>(mut self, subscription: S) -> Self {
1434 self.subscription = Some(subscription.into());
1435 self
1436 }
1437
1438 pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
1440 self.subscription_type = Some(subscription_type);
1441 self
1442 }
1443
1444 pub fn with_lookup_namespace<S: Into<String>>(mut self, namespace: S) -> Self {
1449 self.namespace = Some(namespace.into());
1450 self
1451 }
1452
1453 pub fn with_topic_refresh(mut self, refresh_interval: Duration) -> Self {
1455 self.topic_refresh = Some(refresh_interval);
1456 self
1457 }
1458
1459 pub fn with_consumer_id(mut self, consumer_id: u64) -> Self {
1461 self.consumer_id = Some(consumer_id);
1462 self
1463 }
1464
1465 pub fn with_consumer_name<S: Into<String>>(mut self, consumer_name: S) -> Self {
1467 self.consumer_name = Some(consumer_name.into());
1468 self
1469 }
1470
1471 pub fn with_batch_size(mut self, batch_size: u32) -> Self {
1478 self.batch_size = Some(batch_size);
1479 self
1480 }
1481
1482 pub fn with_options(mut self, options: ConsumerOptions) -> Self {
1484 self.consumer_options = Some(options);
1485 self
1486 }
1487
1488 pub fn with_dead_letter_policy(mut self, dead_letter_policy: DeadLetterPolicy) -> Self {
1490 self.dead_letter_policy = Some(dead_letter_policy);
1491 self
1492 }
1493
1494 pub fn with_unacked_message_resend_delay(mut self, delay: Option<Duration>) -> Self {
1498 self.unacked_message_resend_delay = delay;
1499 self
1500 }
1501
1502 async fn validate<T: DeserializeMessage>(
1505 self,
1506 ) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> {
1507 let ConsumerBuilder {
1508 pulsar,
1509 topics,
1510 topic_regex,
1511 subscription,
1512 subscription_type,
1513 consumer_id,
1514 mut consumer_name,
1515 batch_size,
1516 unacked_message_resend_delay,
1517 consumer_options,
1518 dead_letter_policy,
1519 namespace: _,
1520 topic_refresh: _,
1521 } = self;
1522
1523 if consumer_name.is_none() {
1524 let s: String = (0..8)
1525 .map(|_| rand::thread_rng().sample(Alphanumeric))
1526 .map(|c| c as char)
1527 .collect();
1528 consumer_name = Some(format!("consumer_{}", s));
1529 }
1530
1531 if topics.is_none() && topic_regex.is_none() {
1532 return Err(Error::Custom(
1533 "Cannot create consumer with no topics and no topic regex".into(),
1534 ));
1535 }
1536
1537 let topics: Vec<(String, BrokerAddress)> = try_join_all(
1538 topics
1539 .into_iter()
1540 .flatten()
1541 .map(|topic| pulsar.lookup_partitioned_topic(topic)),
1542 )
1543 .await?
1544 .into_iter()
1545 .flatten()
1546 .collect();
1547
1548 if topics.is_empty() && topic_regex.is_none() {
1549 return Err(Error::Custom(
1550 "Unable to create consumer - topic not found".to_string(),
1551 ));
1552 }
1553
1554 let consumer_id = match (consumer_id, topics.len()) {
1555 (Some(consumer_id), 1) => Some(consumer_id),
1556 (Some(_), _) => {
1557 warn!("Cannot specify consumer id for connecting to partitioned topics or multiple topics");
1558 None
1559 }
1560 _ => None,
1561 };
1562 let subscription = subscription.unwrap_or_else(|| {
1563 let s: String = (0..8)
1564 .map(|_| rand::thread_rng().sample(Alphanumeric))
1565 .map(|c| c as char)
1566 .collect();
1567 let subscription = format!("sub_{}", s);
1568 warn!(
1569 "Subscription not specified. Using new subscription `{}`.",
1570 subscription
1571 );
1572 subscription
1573 });
1574 let sub_type = subscription_type.unwrap_or_else(|| {
1575 warn!("Subscription Type not specified. Defaulting to `Shared`.");
1576 SubType::Shared
1577 });
1578
1579 let config = ConsumerConfig {
1580 subscription,
1581 sub_type,
1582 batch_size,
1583 consumer_name,
1584 consumer_id,
1585 unacked_message_redelivery_delay: unacked_message_resend_delay,
1586 options: consumer_options.unwrap_or_default(),
1587 dead_letter_policy,
1588 };
1589 Ok((config, topics))
1590 }
1591
1592 pub async fn build<T: DeserializeMessage>(self) -> Result<Consumer<T, Exe>, Error> {
1594 let (config, joined_topics) = self.clone().validate::<T>().await?;
1596
1597 let consumers = try_join_all(joined_topics.into_iter().map(|(topic, addr)| {
1598 TopicConsumer::new(self.pulsar.clone(), topic, addr, config.clone())
1599 }))
1600 .await?;
1601
1602 let consumer = if consumers.len() == 1 {
1603 let consumer = consumers.into_iter().next().unwrap();
1604 InnerConsumer::Single(consumer)
1605 } else {
1606 let consumers: BTreeMap<_, _> = consumers
1607 .into_iter()
1608 .map(|c| (c.topic(), Box::pin(c)))
1609 .collect();
1610 let topics = consumers.keys().cloned().collect();
1611 let topic_refresh = self
1612 .topic_refresh
1613 .unwrap_or_else(|| Duration::from_secs(30));
1614 let refresh = Box::pin(self.pulsar.executor.interval(topic_refresh).map(drop));
1615 let mut consumer = MultiTopicConsumer {
1616 namespace: self
1617 .namespace
1618 .unwrap_or_else(|| "public/default".to_string()),
1619 topic_regex: self.topic_regex,
1620 pulsar: self.pulsar,
1621 consumers,
1622 topics,
1623 new_consumers: None,
1624 refresh,
1625 config,
1626 disc_last_message_received: None,
1627 disc_messages_received: 0,
1628 };
1629 if consumer.topic_regex.is_some() {
1630 consumer.update_topics();
1631 let initial_consumers = consumer.new_consumers.take().unwrap().await?;
1632 consumer.add_consumers(initial_consumers);
1633 }
1634 InnerConsumer::Multi(consumer)
1635 };
1636 Ok(Consumer { inner: consumer })
1637 }
1638
1639 pub async fn into_reader<T: DeserializeMessage>(self) -> Result<Reader<T, Exe>, Error> {
1641 let (mut config, mut joined_topics) = self.clone().validate::<T>().await?;
1643
1644 warn!("Subscription Type for a reader is `Exclusive`. Resetting.");
1647 config.sub_type = SubType::Exclusive;
1648
1649 if self.topics.unwrap().len() > 1 {
1650 return Err(Error::Custom(
1651 "Unable to create a reader - one topic max".to_string(),
1652 ));
1653 }
1654
1655 let (topic, addr) = joined_topics.pop().unwrap();
1656 let consumer = TopicConsumer::new(self.pulsar.clone(), topic, addr, config.clone()).await?;
1657
1658 Ok(Reader {
1659 consumer,
1660 state: Some(State::PollingConsumer),
1661 })
1662 }
1663}
1664
1665#[derive(Debug, Clone, Default)]
1667pub(crate) struct ConsumerConfig {
1668 pub(crate) subscription: String,
1670 pub(crate) sub_type: SubType,
1674 pub(crate) batch_size: Option<u32>,
1678 pub(crate) consumer_name: Option<String>,
1680 consumer_id: Option<u64>,
1682 unacked_message_redelivery_delay: Option<Duration>,
1684 pub(crate) options: ConsumerOptions,
1686 dead_letter_policy: Option<DeadLetterPolicy>,
1688}
1689
1690struct MultiTopicConsumer<T: DeserializeMessage, Exe: Executor> {
1692 namespace: String,
1693 topic_regex: Option<Regex>,
1694 pulsar: Pulsar<Exe>,
1695 consumers: BTreeMap<String, Pin<Box<TopicConsumer<T, Exe>>>>,
1696 topics: VecDeque<String>,
1697 #[allow(clippy::type_complexity)]
1698 new_consumers:
1699 Option<Pin<Box<dyn Future<Output = Result<Vec<TopicConsumer<T, Exe>>, Error>> + Send>>>,
1700 refresh: Pin<Box<dyn Stream<Item = ()> + Send>>,
1701 config: ConsumerConfig,
1702 disc_messages_received: u64,
1704 disc_last_message_received: Option<DateTime<Utc>>,
1705}
1706
1707impl<T: DeserializeMessage, Exe: Executor> MultiTopicConsumer<T, Exe> {
1708 fn topics(&self) -> Vec<String> {
1709 self.topics.iter().map(|s| s.to_string()).collect()
1710 }
1711
1712 fn last_message_received(&self) -> Option<DateTime<Utc>> {
1713 self.consumers
1714 .values()
1715 .filter_map(|c| c.last_message_received)
1716 .chain(self.disc_last_message_received)
1717 .max()
1718 }
1719
1720 fn messages_received(&self) -> u64 {
1721 self.consumers
1722 .values()
1723 .map(|c| c.messages_received)
1724 .chain(iter::once(self.disc_messages_received))
1725 .sum()
1726 }
1727
1728 async fn check_connections(&mut self) -> Result<(), Error> {
1729 self.pulsar
1730 .manager
1731 .get_base_connection()
1732 .await?
1733 .sender()
1734 .send_ping()
1735 .await?;
1736
1737 for consumer in self.consumers.values_mut() {
1738 consumer.connection().await?.sender().send_ping().await?;
1739 }
1740
1741 Ok(())
1742 }
1743
1744 async fn unsubscribe(&mut self) -> Result<(), Error> {
1745 for consumer in self.consumers.values_mut() {
1746 consumer.unsubscribe().await?;
1747 }
1748
1749 Ok(())
1750 }
1751
1752 fn add_consumers<I: IntoIterator<Item = TopicConsumer<T, Exe>>>(&mut self, consumers: I) {
1753 for consumer in consumers {
1754 let topic = consumer.topic().to_owned();
1755 self.consumers.insert(topic.clone(), Box::pin(consumer));
1756 self.topics.push_back(topic);
1757 }
1758 }
1759
1760 fn remove_consumers(&mut self, topics: &[String]) {
1761 self.topics.retain(|t| !topics.contains(t));
1762 for topic in topics {
1763 if let Some(consumer) = self.consumers.remove(topic) {
1764 self.disc_messages_received += consumer.messages_received;
1765 self.disc_last_message_received = self
1766 .disc_last_message_received
1767 .into_iter()
1768 .chain(consumer.last_message_received)
1769 .max();
1770 }
1771 }
1772 }
1773
1774 fn update_topics(&mut self) {
1775 if let Some(regex) = self.topic_regex.clone() {
1776 let pulsar = self.pulsar.clone();
1777 let namespace = self.namespace.clone();
1778 let existing_topics: BTreeSet<String> = self.consumers.keys().cloned().collect();
1779 let consumer_config = self.config.clone();
1780
1781 self.new_consumers = Some(Box::pin(async move {
1782 let topics = pulsar
1783 .get_topics_of_namespace(
1784 namespace.clone(),
1785 proto::command_get_topics_of_namespace::Mode::All,
1786 )
1787 .await?;
1788 trace!("fetched topics {:?}", topics);
1789
1790 let topics: Vec<_> = try_join_all(
1791 topics
1792 .into_iter()
1793 .filter(|t| regex.is_match(t))
1794 .map(|topic| pulsar.lookup_partitioned_topic(topic)),
1795 )
1796 .await?
1797 .into_iter()
1798 .flatten()
1799 .collect();
1800
1801 trace!("matched topics {:?} (regex: {})", topics, ®ex);
1802
1803 let consumers = try_join_all(
1804 topics
1805 .into_iter()
1806 .filter(|(t, _)| !existing_topics.contains(t))
1807 .map(|(topic, addr)| {
1808 TopicConsumer::new(pulsar.clone(), topic, addr, consumer_config.clone())
1809 }),
1810 )
1811 .await?;
1812 trace!("created {} consumers", consumers.len());
1813 Ok(consumers)
1814 }));
1815 }
1816 }
1817
1818 async fn ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
1819 if let Some(c) = self.consumers.get_mut(&msg.topic) {
1820 c.ack(msg).await
1821 } else {
1822 Err(ConnectionError::Unexpected(format!("no consumer for topic {}", msg.topic)).into())
1823 }
1824 }
1825
1826 async fn cumulative_ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
1827 if let Some(c) = self.consumers.get_mut(&msg.topic) {
1828 c.cumulative_ack(msg).await
1829 } else {
1830 Err(ConnectionError::Unexpected(format!("no consumer for topic {}", msg.topic)).into())
1831 }
1832 }
1833
1834 async fn nack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
1835 if let Some(c) = self.consumers.get_mut(&msg.topic) {
1836 c.nack(msg).await?;
1837 Ok(())
1838 } else {
1839 Err(ConnectionError::Unexpected(format!("no consumer for topic {}", msg.topic)).into())
1840 }
1841 }
1842
1843 async fn seek(
1845 &mut self,
1846 consumer_ids: Option<Vec<String>>,
1847 message_id: Option<MessageIdData>,
1848 timestamp: Option<u64>,
1849 ) -> Result<(), Error> {
1850 match consumer_ids {
1852 Some(consumer_ids) => {
1853 let mut actions = Vec::default();
1855 for (consumer_id, consumer) in self.consumers.iter_mut() {
1856 if consumer_ids.contains(consumer_id) {
1857 actions.push(consumer.seek(message_id.clone(), timestamp));
1858 }
1859 }
1860 let mut v = futures::future::join_all(actions).await;
1862
1863 for res in v.drain(..) {
1864 if res.is_err() {
1865 return res;
1866 }
1867 }
1868
1869 Ok(())
1870 }
1871 None => Err(ConnectionError::Unexpected(format!(
1872 "no consumer for consumer ids {:?}",
1873 consumer_ids
1874 ))
1875 .into()),
1876 }
1877 }
1878
1879 fn config(&self) -> &ConsumerConfig {
1880 &self.config
1881 }
1882}
1883
1884pub struct Message<T> {
1888 pub topic: String,
1890 pub payload: Payload,
1892 pub message_id: MessageData,
1894 _phantom: PhantomData<T>,
1895}
1896
1897impl<T> Message<T> {
1898 pub fn metadata(&self) -> &MessageMetadata {
1900 &self.payload.metadata
1901 }
1902
1903 pub fn message_id(&self) -> &proto::MessageIdData {
1905 &self.message_id.id
1906 }
1907
1908 pub fn key(&self) -> Option<String> {
1910 self.payload.metadata.partition_key.clone()
1911 }
1912}
1913impl<T: DeserializeMessage> Message<T> {
1914 pub fn deserialize(&self) -> T::Output {
1916 T::deserialize_message(&self.payload)
1917 }
1918}
1919
1920impl<T: DeserializeMessage, Exe: Executor> Debug for MultiTopicConsumer<T, Exe> {
1921 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1922 write!(
1923 f,
1924 "MultiTopicConsumer({:?}, {:?})",
1925 &self.namespace, &self.topic_regex
1926 )
1927 }
1928}
1929
1930impl<T: DeserializeMessage, Exe: Executor> Stream for MultiTopicConsumer<T, Exe> {
1931 type Item = Result<Message<T>, Error>;
1932
1933 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1934 if let Some(mut new_consumers) = self.new_consumers.take() {
1935 match new_consumers.as_mut().poll(cx) {
1936 Poll::Ready(Ok(new_consumers)) => {
1937 self.add_consumers(new_consumers);
1938 }
1939 Poll::Pending => {
1940 self.new_consumers = Some(new_consumers);
1941 }
1942 Poll::Ready(Err(e)) => {
1943 error!("Error creating pulsar consumers: {}", e);
1944 }
1947 }
1948 }
1949
1950 if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) {
1951 self.update_topics();
1952 return self.poll_next(cx);
1953 }
1954
1955 let mut topics_to_remove = Vec::new();
1956 let mut result = None;
1957 for _ in 0..self.topics.len() {
1958 if result.is_some() {
1959 break;
1960 }
1961 let topic = self.topics.pop_front().unwrap();
1962 if let Some(item) = self
1963 .consumers
1964 .get_mut(&topic)
1965 .map(|c| c.as_mut().poll_next(cx))
1966 {
1967 match item {
1968 Poll::Pending => {}
1969 Poll::Ready(Some(Ok(msg))) => result = Some(msg),
1970 Poll::Ready(None) => {
1971 error!("Unexpected end of stream for pulsar topic {}", &topic);
1972 topics_to_remove.push(topic.clone());
1973 }
1974 Poll::Ready(Some(Err(e))) => {
1975 error!(
1976 "Unexpected error consuming from pulsar topic {}: {}",
1977 &topic, e
1978 );
1979 topics_to_remove.push(topic.clone());
1980 }
1981 }
1982 } else {
1983 eprintln!("BUG: Missing consumer for topic {}", &topic);
1984 }
1985 self.topics.push_back(topic);
1986 }
1987 self.remove_consumers(&topics_to_remove);
1988 if let Some(result) = result {
1989 return Poll::Ready(Some(Ok(result)));
1990 }
1991
1992 Poll::Pending
1993 }
1994}
1995
1996#[cfg(test)]
1997mod tests {
1998 use std::time::{SystemTime, UNIX_EPOCH};
1999
2000 use futures::{StreamExt, TryStreamExt};
2001 use log::LevelFilter;
2002 use regex::Regex;
2003 #[cfg(feature = "tokio-runtime")]
2004 use tokio::time::timeout;
2005
2006 #[cfg(feature = "tokio-runtime")]
2007 use crate::executor::TokioExecutor;
2008 use crate::{producer, tests::TEST_LOGGER, Pulsar, SerializeMessage};
2009
2010 use super::*;
2011 use futures::future::{select, Either};
2012
2013 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
2014 pub struct TestData {
2015 topic: String,
2016 msg: u32,
2017 }
2018
2019 impl<'a> SerializeMessage for &'a TestData {
2020 fn serialize_message(input: Self) -> Result<producer::Message, Error> {
2021 let payload = serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?;
2022 Ok(producer::Message {
2023 payload,
2024 ..Default::default()
2025 })
2026 }
2027 }
2028
2029 impl DeserializeMessage for TestData {
2030 type Output = Result<TestData, serde_json::Error>;
2031
2032 fn deserialize_message(payload: &Payload) -> Self::Output {
2033 serde_json::from_slice(&payload.data)
2034 }
2035 }
2036
2037 pub static MULTI_LOGGER: crate::tests::SimpleLogger = crate::tests::SimpleLogger {
2038 tag: "multi_consumer",
2039 };
2040 #[tokio::test]
2041 #[cfg(feature = "tokio-runtime")]
2042 async fn multi_consumer() {
2043 let _ = log::set_logger(&MULTI_LOGGER);
2044 let _ = log::set_max_level(LevelFilter::Debug);
2045 let addr = "pulsar://127.0.0.1:6650";
2046
2047 let topic_n: u16 = rand::random();
2048 let topic1 = format!("multi_consumer_a_{}", topic_n);
2049 let topic2 = format!("multi_consumer_b_{}", topic_n);
2050
2051 let data1 = TestData {
2052 topic: "a".to_owned(),
2053 msg: 1,
2054 };
2055 let data2 = TestData {
2056 topic: "a".to_owned(),
2057 msg: 2,
2058 };
2059 let data3 = TestData {
2060 topic: "b".to_owned(),
2061 msg: 3,
2062 };
2063 let data4 = TestData {
2064 topic: "b".to_owned(),
2065 msg: 4,
2066 };
2067
2068 let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2069
2070 try_join_all(vec![
2071 client.send(&topic1, &data1),
2072 client.send(&topic1, &data2),
2073 client.send(&topic2, &data3),
2074 client.send(&topic2, &data4),
2075 ])
2076 .await
2077 .unwrap();
2078
2079 let builder = client
2080 .consumer()
2081 .with_subscription_type(SubType::Shared)
2082 .with_options(ConsumerOptions {
2084 initial_position: InitialPosition::Earliest,
2085 ..Default::default()
2086 });
2087
2088 let consumer_1: Consumer<TestData, _> = builder
2089 .clone()
2090 .with_subscription("consumer_1")
2091 .with_topics(&[&topic1, &topic2])
2092 .build()
2093 .await
2094 .unwrap();
2095
2096 let consumer_2: Consumer<TestData, _> = builder
2097 .with_subscription("consumer_2")
2098 .with_topic_regex(Regex::new(&format!("multi_consumer_[ab]_{}", topic_n)).unwrap())
2099 .build()
2100 .await
2101 .unwrap();
2102
2103 let expected: HashSet<_> = vec![data1, data2, data3, data4].into_iter().collect();
2104 for consumer in [consumer_1, consumer_2].iter_mut() {
2105 let connected_topics = consumer.topics();
2106 debug!(
2107 "connected topics for {}: {:?}",
2108 consumer.subscription(),
2109 &connected_topics
2110 );
2111 assert_eq!(connected_topics.len(), 2);
2112 assert!(connected_topics.iter().any(|t| t.ends_with(&topic1)));
2113 assert!(connected_topics.iter().any(|t| t.ends_with(&topic2)));
2114
2115 let mut received = HashSet::new();
2116 while let Some(message) = timeout(Duration::from_secs(1), consumer.next())
2117 .await
2118 .unwrap()
2119 {
2120 received.insert(message.unwrap().deserialize().unwrap());
2121 if received.len() == 4 {
2122 break;
2123 }
2124 }
2125 assert_eq!(expected, received);
2126 assert_eq!(consumer.messages_received(), 4);
2127 assert!(consumer.last_message_received().is_some());
2128 }
2129 }
2130
2131 #[tokio::test]
2132 #[cfg(feature = "tokio-runtime")]
2133 async fn consumer_dropped_with_lingering_acks() {
2134 use rand::{distributions::Alphanumeric, Rng};
2135 let _ = log::set_logger(&TEST_LOGGER);
2136 let _ = log::set_max_level(LevelFilter::Debug);
2137 let addr = "pulsar://127.0.0.1:6650";
2138
2139 let topic = format!(
2140 "consumer_dropped_with_lingering_acks_{}",
2141 rand::random::<u16>()
2142 );
2143
2144 let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2145
2146 let message = TestData {
2147 topic: std::iter::repeat(())
2148 .map(|()| rand::thread_rng().sample(Alphanumeric) as char)
2149 .take(8)
2150 .map(|c| c as char)
2151 .collect(),
2152 msg: 1,
2153 };
2154
2155 client.send(&topic, &message).await.unwrap().await.unwrap();
2156 println!("producer sends done");
2157
2158 {
2159 println!("creating consumer");
2160 let mut consumer: Consumer<TestData, _> = client
2161 .consumer()
2162 .with_topic(&topic)
2163 .with_subscription("dropped_ack")
2164 .with_subscription_type(SubType::Shared)
2165 .with_options(ConsumerOptions {
2167 initial_position: InitialPosition::Earliest,
2168 ..Default::default()
2169 })
2170 .build()
2171 .await
2172 .unwrap();
2173
2174 println!("created consumer");
2175
2176 let msg: Message<TestData> = timeout(Duration::from_secs(1), consumer.next())
2178 .await
2179 .unwrap()
2180 .unwrap()
2181 .unwrap();
2182 println!("got message: {:?}", msg.payload);
2183 assert_eq!(
2184 message,
2185 msg.deserialize().unwrap(),
2186 "we probably receive a message from a previous run of the test"
2187 );
2188 consumer.ack(&msg).await.unwrap();
2189 }
2190
2191 {
2192 println!("creating second consumer. The message should have been acked");
2193 let mut consumer: Consumer<TestData, _> = client
2194 .consumer()
2195 .with_topic(&topic)
2196 .with_subscription("dropped_ack")
2197 .with_subscription_type(SubType::Shared)
2198 .with_options(ConsumerOptions {
2199 initial_position: InitialPosition::Earliest,
2200 ..Default::default()
2201 })
2202 .build()
2203 .await
2204 .unwrap();
2205
2206 println!("created second consumer");
2207
2208 let res: Result<_, tokio::time::error::Elapsed> =
2210 tokio::time::timeout(Duration::from_secs(1), consumer.next()).await;
2211 let is_err = res.is_err();
2212 if let Ok(val) = res {
2213 let msg = val.unwrap().unwrap();
2214 println!("got message: {:?}", msg.payload);
2215 consumer.ack(&msg).await.unwrap();
2217 assert_eq!(message, msg.deserialize().unwrap());
2219 }
2220
2221 assert!(is_err, "waiting for a message should have timed out, since we already acknowledged the only message in the queue");
2222 }
2223 }
2224
2225 #[tokio::test]
2226 #[cfg(feature = "tokio-runtime")]
2227 async fn dead_letter_queue() {
2228 let _ = log::set_logger(&TEST_LOGGER);
2229 let _ = log::set_max_level(LevelFilter::Debug);
2230 let addr = "pulsar://127.0.0.1:6650";
2231
2232 let test_id: u16 = rand::random();
2233 let topic = format!("dead_letter_queue_test_{}", test_id);
2234 let test_msg: u32 = rand::random();
2235
2236 let message = TestData {
2237 topic: topic.clone(),
2238 msg: test_msg,
2239 };
2240
2241 let dead_letter_topic = format!("{}_dlq", topic);
2242
2243 let dead_letter_policy = crate::consumer::DeadLetterPolicy {
2244 max_redeliver_count: 1,
2245 dead_letter_topic: dead_letter_topic.clone(),
2246 };
2247
2248 let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2249
2250 println!("creating consumer");
2251 let mut consumer: Consumer<TestData, _> = client
2252 .consumer()
2253 .with_topic(topic.clone())
2254 .with_subscription("nack")
2255 .with_subscription_type(SubType::Shared)
2256 .with_dead_letter_policy(dead_letter_policy)
2257 .build()
2258 .await
2259 .unwrap();
2260
2261 println!("created consumer");
2262
2263 println!("creating second consumer that consumes from the DLQ");
2264 let mut dlq_consumer: Consumer<TestData, _> = client
2265 .clone()
2266 .consumer()
2267 .with_topic(dead_letter_topic)
2268 .with_subscription("dead_letter_topic")
2269 .with_subscription_type(SubType::Shared)
2270 .build()
2271 .await
2272 .unwrap();
2273
2274 println!("created second consumer");
2275
2276 client.send(&topic, &message).await.unwrap().await.unwrap();
2277 println!("producer sends done");
2278
2279 let msg = consumer.next().await.unwrap().unwrap();
2280 println!("got message: {:?}", msg.payload);
2281 assert_eq!(
2282 message,
2283 msg.deserialize().unwrap(),
2284 "we probably received a message from a previous run of the test"
2285 );
2286 consumer.nack(&msg).await.unwrap();
2288
2289 let dlq_msg = dlq_consumer.next().await.unwrap().unwrap();
2290 println!("got message: {:?}", msg.payload);
2291 assert_eq!(
2292 message,
2293 dlq_msg.deserialize().unwrap(),
2294 "we probably received a message from a previous run of the test"
2295 );
2296 dlq_consumer.ack(&dlq_msg).await.unwrap();
2297 }
2298
2299 #[tokio::test]
2300 #[cfg(feature = "tokio-runtime")]
2301 async fn failover() {
2302 let _ = log::set_logger(&MULTI_LOGGER);
2303 let _ = log::set_max_level(LevelFilter::Debug);
2304 let addr = "pulsar://127.0.0.1:6650";
2305 let topic = format!("failover_{}", rand::random::<u16>());
2306 let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2307
2308 let msg_count = 100_u32;
2309 try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string())))
2310 .await
2311 .unwrap();
2312
2313 let builder = client
2314 .consumer()
2315 .with_subscription("failover")
2316 .with_topic(&topic)
2317 .with_subscription_type(SubType::Failover)
2318 .with_options(ConsumerOptions {
2320 initial_position: InitialPosition::Earliest,
2321 ..Default::default()
2322 });
2323
2324 let mut consumer_1: Consumer<String, _> = builder.clone().build().await.unwrap();
2325
2326 let mut consumer_2: Consumer<String, _> = builder.build().await.unwrap();
2327
2328 let mut consumed_1 = 0_u32;
2329 let mut consumed_2 = 0_u32;
2330 let mut pending_1 = Some(consumer_1.next());
2331 let mut pending_2 = Some(consumer_2.next());
2332 while consumed_1 + consumed_2 < msg_count {
2333 let next = select(pending_1.take().unwrap(), pending_2.take().unwrap());
2334 match timeout(Duration::from_secs(2), next).await.unwrap() {
2335 Either::Left((msg, pending)) => {
2336 consumed_1 += 1;
2337 let _ = consumer_1.ack(&msg.unwrap().unwrap());
2338 pending_1 = Some(consumer_1.next());
2339 pending_2 = Some(pending);
2340 }
2341 Either::Right((msg, pending)) => {
2342 consumed_2 += 1;
2343 let _ = consumer_2.ack(&msg.unwrap().unwrap());
2344 pending_1 = Some(pending);
2345 pending_2 = Some(consumer_2.next());
2346 }
2347 }
2348 }
2349 match (consumed_1, consumed_2) {
2350 (consumed_1, 0) => assert_eq!(consumed_1, msg_count),
2351 (0, consumed_2) => assert_eq!(consumed_2, msg_count),
2352 _ => panic!("Expected one consumer to consume all messages. Message count: {}, consumer_1: {} consumer_2: {}", msg_count, consumed_1, consumed_2),
2353 }
2354 }
2355
2356 #[tokio::test]
2357 #[cfg(feature = "tokio-runtime")]
2358 async fn seek_single_consumer() {
2359 let _ = log::set_logger(&MULTI_LOGGER);
2360 let _ = log::set_max_level(LevelFilter::Debug);
2361 log::info!("starting seek test");
2362 let addr = "pulsar://127.0.0.1:6650";
2363 let topic = format!("seek_{}", rand::random::<u16>());
2364 let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2365
2366 let msg_count = 100_u32;
2368
2369 let start_time: u64 = SystemTime::now()
2370 .duration_since(UNIX_EPOCH)
2371 .unwrap()
2372 .as_millis() as u64;
2373
2374 std::thread::sleep(Duration::from_secs(2));
2375
2376 println!("this is the starting time: {}", start_time);
2377
2378 try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string())))
2379 .await
2380 .unwrap();
2381 log::info!("sent all messages");
2382
2383 let mut consumer_1: Consumer<String, _> = client
2384 .consumer()
2385 .with_consumer_name("seek_single_test")
2386 .with_subscription("seek_single_test")
2387 .with_subscription_type(SubType::Shared)
2388 .with_topic(&topic)
2389 .build()
2390 .await
2391 .unwrap();
2392
2393 log::info!("built the consumer");
2394
2395 let mut consumed_1 = 0_u32;
2396 while let Some(msg) = consumer_1.try_next().await.unwrap() {
2397 consumer_1.ack(&msg).await.unwrap();
2398 let publish_time = msg.metadata().publish_time;
2399 let data = match msg.deserialize() {
2400 Ok(data) => data,
2401 Err(e) => {
2402 log::error!("could not deserialize message: {:?}", e);
2403 break;
2404 }
2405 };
2406
2407 consumed_1 += 1;
2408 log::info!(
2409 "first loop, got {} messages, content: {}, publish time: {}",
2410 consumed_1,
2411 data,
2412 publish_time
2413 );
2414
2415 if consumed_1 >= msg_count / 2 {
2417 log::info!("first loop, received {} messages, so break", consumed_1);
2418 break;
2419 }
2420 }
2421
2422 log::info!("calling seek method");
2424 let _seek_result = consumer_1
2425 .seek(None, None, Some(start_time), client)
2426 .await
2427 .unwrap();
2428
2429 let mut consumed_2 = 0_u32;
2440 log::info!("reading messages again");
2441 while let Some(msg) = consumer_1.try_next().await.unwrap() {
2442 let publish_time = msg.metadata().publish_time;
2443 consumer_1.ack(&msg).await.unwrap();
2444 let data = match msg.deserialize() {
2445 Ok(data) => data,
2446 Err(e) => {
2447 log::error!("could not deserialize message: {:?}", e);
2448 break;
2449 }
2450 };
2451 consumed_2 += 1;
2452 log::info!(
2453 "second loop, got {} messages, content: {}, publish time: {}",
2454 consumed_2,
2455 data,
2456 publish_time,
2457 );
2458
2459 if consumed_2 >= msg_count {
2460 log::info!("received {} messagses, so break", consumed_2);
2461 break;
2462 }
2463 }
2464
2465 assert_eq!(50, consumed_1);
2467 assert_eq!(100, consumed_2);
2468 }
2469}