pulsar/
consumer.rs

1//! Topic subscriptions
2use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
3use std::fmt::Debug;
4use std::marker::PhantomData;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use chrono::{DateTime, Utc};
10use futures::channel::mpsc::unbounded;
11use futures::task::{Context, Poll};
12use futures::{
13    channel::{mpsc, oneshot},
14    future::{select, try_join_all, Either},
15    pin_mut, Future, FutureExt, SinkExt, Stream, StreamExt,
16};
17use regex::Regex;
18
19use crate::connection::Connection;
20use crate::error::{ConnectionError, ConsumerError, Error};
21use crate::executor::Executor;
22use crate::message::proto::CommandMessage;
23use crate::message::{
24    parse_batched_message,
25    proto::{self, command_subscribe::SubType, MessageIdData, MessageMetadata, Schema},
26    BatchedMessage, Message as RawMessage, Metadata, Payload,
27};
28use crate::proto::{BaseCommand, CommandCloseConsumer};
29use crate::reader::{Reader, State};
30use crate::{BrokerAddress, DeserializeMessage, Pulsar};
31use core::iter;
32use rand::distributions::Alphanumeric;
33use rand::Rng;
34use std::convert::TryFrom;
35use url::Url;
36
37/// Configuration options for consumers
38#[derive(Clone, Default, Debug)]
39pub struct ConsumerOptions {
40    pub priority_level: Option<i32>,
41    /// Signal wether the subscription should be backed by a
42    /// durable cursor or not
43    pub durable: Option<bool>,
44    /// If specified, the subscription will position the cursor
45    /// marked-delete position on the particular message id and
46    /// will send messages from that point
47    pub start_message_id: Option<MessageIdData>,
48    /// Add optional metadata key=value to this consumer
49    pub metadata: BTreeMap<String, String>,
50    pub read_compacted: Option<bool>,
51    pub schema: Option<Schema>,
52    /// Signal whether the subscription will initialize on latest
53    /// or earliest message (default on latest)
54    ///
55    /// an enum can be used to initialize it:
56    ///
57    /// ```rust,ignore
58    /// ConsumerOptions {
59    ///     initial_position: InitialPosition::Earliest,
60    /// }
61    /// ```
62    pub initial_position: InitialPosition,
63}
64
65impl ConsumerOptions {
66    /// within options, sets the priority level
67    pub fn with_priority_level(mut self, priority_level: i32) -> Self {
68        self.priority_level = Some(priority_level);
69        self
70    }
71
72    pub fn durable(mut self, durable: bool) -> Self {
73        self.durable = Some(durable);
74        self
75    }
76
77    pub fn starting_on_message(mut self, message_id_data: MessageIdData) -> Self {
78        self.start_message_id = Some(message_id_data);
79        self
80    }
81
82    pub fn with_metadata(mut self, metadata: BTreeMap<String, String>) -> Self {
83        self.metadata = metadata;
84        self
85    }
86
87    pub fn read_compacted(mut self, read_compacted: bool) -> Self {
88        self.read_compacted = Some(read_compacted);
89        self
90    }
91
92    pub fn with_schema(mut self, schema: Schema) -> Self {
93        self.schema = Some(schema);
94        self
95    }
96
97    pub fn with_initial_position(mut self, initial_position: InitialPosition) -> Self {
98        self.initial_position = initial_position;
99        self
100    }
101}
102
103#[derive(Debug, Clone)]
104pub struct DeadLetterPolicy {
105    /// Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
106    pub max_redeliver_count: usize,
107    /// Name of the dead topic where the failing messages will be sent.
108    pub dead_letter_topic: String,
109}
110
111/// position of the first message that will be consumed
112#[derive(Clone, Debug)]
113pub enum InitialPosition {
114    /// start at the oldest message
115    Earliest,
116    /// start at the most recent message
117    Latest,
118}
119
120impl Default for InitialPosition {
121    fn default() -> Self {
122        InitialPosition::Latest
123    }
124}
125impl From<InitialPosition> for i32 {
126    fn from(i: InitialPosition) -> Self {
127        match i {
128            InitialPosition::Earliest => 1,
129            InitialPosition::Latest => 0,
130        }
131    }
132}
133
134/// the consumer is used to subscribe to a topic
135///
136/// ```rust,no_run
137/// use pulsar::{Consumer, SubType};
138/// use futures::StreamExt;
139///
140/// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
141/// # type TestData = String;
142/// let mut consumer: Consumer<TestData, _> = pulsar
143///     .consumer()
144///     .with_topic("non-persistent://public/default/test")
145///     .with_consumer_name("test_consumer")
146///     .with_subscription_type(SubType::Exclusive)
147///     .with_subscription("test_subscription")
148///     .build()
149///     .await?;
150///
151/// let mut counter = 0usize;
152/// while let Some(Ok(msg)) = consumer.next().await {
153///     consumer.ack(&msg).await?;
154///     let data = match msg.deserialize() {
155///         Ok(data) => data,
156///         Err(e) => {
157///             log::error!("could not deserialize message: {:?}", e);
158///             break;
159///         }
160///     };
161///
162///     counter += 1;
163///     log::info!("got {} messages", counter);
164/// }
165/// # Ok(())
166/// # }
167/// ```
168pub struct Consumer<T: DeserializeMessage, Exe: Executor> {
169    inner: InnerConsumer<T, Exe>,
170}
171impl<T: DeserializeMessage, Exe: Executor> Consumer<T, Exe> {
172    /// creates a [ConsumerBuilder] from a client instance
173    pub fn builder(pulsar: &Pulsar<Exe>) -> ConsumerBuilder<Exe> {
174        ConsumerBuilder::new(pulsar)
175    }
176
177    /// test that the connections to the Pulsar brokers are still valid
178    pub async fn check_connection(&mut self) -> Result<(), Error> {
179        match &mut self.inner {
180            InnerConsumer::Single(c) => c.check_connection().await,
181            InnerConsumer::Multi(c) => c.check_connections().await,
182        }
183    }
184
185    /// acknowledges a single message
186    pub async fn ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
187        match &mut self.inner {
188            InnerConsumer::Single(c) => c.ack(msg).await,
189            InnerConsumer::Multi(c) => c.ack(msg).await,
190        }
191    }
192
193    /// acknowledges a message and all the preceding messages
194    pub async fn cumulative_ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
195        match &mut self.inner {
196            InnerConsumer::Single(c) => c.cumulative_ack(msg).await,
197            InnerConsumer::Multi(c) => c.cumulative_ack(msg).await,
198        }
199    }
200
201    /// negative acknowledgement
202    ///
203    /// the message will be sent again on the subscription
204    pub async fn nack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
205        match &mut self.inner {
206            InnerConsumer::Single(c) => c.nack(msg).await,
207            InnerConsumer::Multi(c) => c.nack(msg).await,
208        }
209    }
210
211    /// seek currently destroys the existing consumer and creates a new one
212    /// this is how java and cpp pulsar client implement this feature mainly because
213    /// there are many minor problems with flushing existing messages and receiving new ones
214    pub async fn seek(
215        &mut self,
216        consumer_ids: Option<Vec<String>>,
217        message_id: Option<MessageIdData>,
218        timestamp: Option<u64>,
219        client: Pulsar<Exe>,
220    ) -> Result<(), Error> {
221        let inner_consumer: InnerConsumer<T, Exe> = match &mut self.inner {
222            InnerConsumer::Single(c) => {
223                c.seek(message_id, timestamp).await?;
224                let topic = c.topic().to_string();
225                let addr = client.lookup_topic(&topic).await?;
226                let config = c.config().clone();
227                InnerConsumer::Single(TopicConsumer::new(client, topic, addr, config).await?)
228            }
229            InnerConsumer::Multi(c) => {
230                c.seek(consumer_ids, message_id, timestamp).await?;
231                let topics = c.topics();
232
233                //currently, pulsar only supports seek for non partitioned topics
234                let addrs =
235                    try_join_all(topics.into_iter().map(|topic| client.lookup_topic(topic)))
236                        .await?;
237
238                let topic_addr_pair = c.topics.iter().cloned().zip(addrs.iter().cloned());
239
240                let consumers = try_join_all(topic_addr_pair.map(|(topic, addr)| {
241                    TopicConsumer::new(client.clone(), topic, addr, c.config().clone())
242                }))
243                .await?;
244
245                let consumers: BTreeMap<_, _> = consumers
246                    .into_iter()
247                    .map(|c| (c.topic(), Box::pin(c)))
248                    .collect();
249                let topics = consumers.keys().cloned().collect();
250                let topic_refresh = Duration::from_secs(30);
251                let refresh = Box::pin(client.executor.interval(topic_refresh).map(drop));
252                let namespace = c.namespace.clone();
253                let config = c.config().clone();
254                let topic_regex = c.topic_regex.clone();
255                InnerConsumer::Multi(MultiTopicConsumer {
256                    namespace,
257                    topic_regex,
258                    pulsar: client,
259                    consumers,
260                    topics,
261                    new_consumers: None,
262                    refresh,
263                    config,
264                    disc_last_message_received: None,
265                    disc_messages_received: 0,
266                })
267            }
268        };
269
270        self.inner = inner_consumer;
271        Ok(())
272    }
273
274    pub async fn unsubscribe(&mut self) -> Result<(), Error> {
275        match &mut self.inner {
276            InnerConsumer::Single(c) => c.unsubscribe().await,
277            InnerConsumer::Multi(c) => c.unsubscribe().await,
278        }
279    }
280
281    /// returns the list of topics this consumer is subscribed on
282    pub fn topics(&self) -> Vec<String> {
283        match &self.inner {
284            InnerConsumer::Single(c) => vec![c.topic()],
285            InnerConsumer::Multi(c) => c.topics(),
286        }
287    }
288
289    /// returns a list of broker URLs this consumer is connnected to
290    pub async fn connections(&mut self) -> Result<Vec<Url>, Error> {
291        match &mut self.inner {
292            InnerConsumer::Single(c) => Ok(vec![c.connection().await?.url().clone()]),
293            InnerConsumer::Multi(c) => {
294                let v = c
295                    .consumers
296                    .values_mut()
297                    .map(|c| c.connection())
298                    .collect::<Vec<_>>();
299
300                let mut connections = try_join_all(v).await?;
301                Ok(connections
302                    .drain(..)
303                    .map(|conn| conn.url().clone())
304                    .collect::<BTreeSet<_>>()
305                    .into_iter()
306                    .collect())
307            }
308        }
309    }
310
311    /// returns the consumer's configuration options
312    pub fn options(&self) -> &ConsumerOptions {
313        match &self.inner {
314            InnerConsumer::Single(c) => &c.config.options,
315            InnerConsumer::Multi(c) => &c.config.options,
316        }
317    }
318
319    /// returns the consumer's dead letter policy options
320    pub fn dead_letter_policy(&self) -> Option<&DeadLetterPolicy> {
321        match &self.inner {
322            InnerConsumer::Single(c) => c.dead_letter_policy.as_ref(),
323            InnerConsumer::Multi(c) => c.config.dead_letter_policy.as_ref(),
324        }
325    }
326
327    /// returns the consumer's subscription name
328    pub fn subscription(&self) -> &str {
329        match &self.inner {
330            InnerConsumer::Single(c) => &c.config.subscription,
331            InnerConsumer::Multi(c) => &c.config.subscription,
332        }
333    }
334
335    /// returns the consumer's subscription type
336    pub fn sub_type(&self) -> SubType {
337        match &self.inner {
338            InnerConsumer::Single(c) => c.config.sub_type,
339            InnerConsumer::Multi(c) => c.config.sub_type,
340        }
341    }
342
343    /// returns the consumer's batch size
344    pub fn batch_size(&self) -> Option<u32> {
345        match &self.inner {
346            InnerConsumer::Single(c) => c.config.batch_size,
347            InnerConsumer::Multi(c) => c.config.batch_size,
348        }
349    }
350
351    /// returns the consumer's name
352    pub fn consumer_name(&self) -> Option<&str> {
353        match &self.inner {
354            InnerConsumer::Single(c) => &c.config.consumer_name,
355            InnerConsumer::Multi(c) => &c.config.consumer_name,
356        }
357        .as_ref()
358        .map(|s| s.as_str())
359    }
360
361    /// returns the consumer's list of ids
362    pub fn consumer_id(&self) -> Vec<u64> {
363        match &self.inner {
364            InnerConsumer::Single(c) => vec![c.consumer_id],
365            InnerConsumer::Multi(c) => c.consumers.values().map(|c| c.consumer_id).collect(),
366        }
367    }
368
369    /// returns the consumer's redelivery delay
370    ///
371    /// if messages are not acknowledged before this delay, they will be sent
372    /// again on the subscription
373    pub fn unacked_message_redelivery_delay(&self) -> Option<Duration> {
374        match &self.inner {
375            InnerConsumer::Single(c) => c.config.unacked_message_redelivery_delay,
376            InnerConsumer::Multi(c) => c.config.unacked_message_redelivery_delay,
377        }
378    }
379
380    /// returns the date of the last message reception
381    pub fn last_message_received(&self) -> Option<DateTime<Utc>> {
382        match &self.inner {
383            InnerConsumer::Single(c) => c.last_message_received(),
384            InnerConsumer::Multi(c) => c.last_message_received(),
385        }
386    }
387
388    /// returns the current number of messages received
389    pub fn messages_received(&self) -> u64 {
390        match &self.inner {
391            InnerConsumer::Single(c) => c.messages_received(),
392            InnerConsumer::Multi(c) => c.messages_received(),
393        }
394    }
395}
396
397impl<T: DeserializeMessage, Exe: Executor> Stream for Consumer<T, Exe> {
398    type Item = Result<Message<T>, Error>;
399
400    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
401        match &mut self.inner {
402            InnerConsumer::Single(c) => Pin::new(c).poll_next(cx),
403            InnerConsumer::Multi(c) => Pin::new(c).poll_next(cx),
404        }
405    }
406}
407
408enum InnerConsumer<T: DeserializeMessage, Exe: Executor> {
409    Single(TopicConsumer<T, Exe>),
410    Multi(MultiTopicConsumer<T, Exe>),
411}
412
413type MessageIdDataReceiver = mpsc::Receiver<Result<(proto::MessageIdData, Payload), Error>>;
414
415// this is entirely public for use in reader.rs
416pub(crate) struct TopicConsumer<T: DeserializeMessage, Exe: Executor> {
417    pub(crate) consumer_id: u64,
418    pub(crate) config: ConsumerConfig,
419    topic: String,
420    messages: Pin<Box<MessageIdDataReceiver>>,
421    engine_tx: mpsc::UnboundedSender<EngineMessage<Exe>>,
422    #[allow(unused)]
423    data_type: PhantomData<fn(Payload) -> T::Output>,
424    pub(crate) dead_letter_policy: Option<DeadLetterPolicy>,
425    last_message_received: Option<DateTime<Utc>>,
426    messages_received: u64,
427}
428
429impl<T: DeserializeMessage, Exe: Executor> TopicConsumer<T, Exe> {
430    async fn new(
431        client: Pulsar<Exe>,
432        topic: String,
433        mut addr: BrokerAddress,
434        config: ConsumerConfig,
435    ) -> Result<TopicConsumer<T, Exe>, Error> {
436        let ConsumerConfig {
437            subscription,
438            sub_type,
439            batch_size,
440            consumer_name,
441            consumer_id,
442            unacked_message_redelivery_delay,
443            options,
444            dead_letter_policy,
445        } = config.clone();
446        let consumer_id = consumer_id.unwrap_or_else(rand::random);
447        let (resolver, messages) = mpsc::unbounded();
448        let batch_size = batch_size.unwrap_or(1000);
449
450        let mut connection = client.manager.get_connection(&addr).await?;
451        let mut current_retries = 0u32;
452        let start = std::time::Instant::now();
453        let operation_retry_options = client.operation_retry_options.clone();
454
455        loop {
456            match connection
457                .sender()
458                .subscribe(
459                    resolver.clone(),
460                    topic.clone(),
461                    subscription.clone(),
462                    sub_type,
463                    consumer_id,
464                    consumer_name.clone(),
465                    options.clone(),
466                )
467                .await
468            {
469                Ok(_) => {
470                    if current_retries > 0 {
471                        let dur = (std::time::Instant::now() - start).as_secs();
472                        log::info!(
473                            "subscribe({}) success after {} retries over {} seconds",
474                            topic,
475                            current_retries + 1,
476                            dur
477                        );
478                    }
479                    break;
480                }
481                Err(ConnectionError::PulsarError(
482                    Some(proto::ServerError::ServiceNotReady),
483                    text,
484                )) => {
485                    if operation_retry_options.max_retries.is_none()
486                        || operation_retry_options.max_retries.unwrap() > current_retries
487                    {
488                        error!("subscribe({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}",
489                        topic, operation_retry_options.retry_delay.as_millis(),
490                        operation_retry_options.max_retries, text.unwrap_or_else(String::new));
491
492                        current_retries += 1;
493                        client
494                            .executor
495                            .delay(operation_retry_options.retry_delay)
496                            .await;
497
498                        // we need to look up again the topic's address
499                        let prev = addr;
500                        addr = client.lookup_topic(&topic).await?;
501                        if prev != addr {
502                            info!(
503                                "topic {} moved: previous = {:?}, new = {:?}",
504                                topic, prev, addr
505                            );
506                        }
507
508                        connection = client.manager.get_connection(&addr).await?;
509                        continue;
510                    } else {
511                        error!("subscribe({}) reached max retries", topic);
512
513                        return Err(ConnectionError::PulsarError(
514                            Some(proto::ServerError::ServiceNotReady),
515                            text,
516                        )
517                        .into());
518                    }
519                }
520                Err(e) => return Err(Error::Connection(e)),
521            }
522        }
523
524        connection
525            .sender()
526            .send_flow(consumer_id, batch_size)
527            .map_err(|e| {
528                error!("TopicConsumer::new error[{}]: {:?}", line!(), e);
529                e
530            })
531            .map_err(|e| Error::Consumer(ConsumerError::Connection(e)))?;
532
533        let (engine_tx, engine_rx) = unbounded();
534        // drop_signal will be dropped when Consumer is dropped, then
535        // drop_receiver will return, and we can close the consumer
536        let (_drop_signal, drop_receiver) = oneshot::channel::<()>();
537        let conn = connection.clone();
538        //let ack_sender = nack_handler.clone();
539        let name = consumer_name.clone();
540        let topic_name = topic.clone();
541        let _ = client.executor.spawn(Box::pin(async move {
542            let _res = drop_receiver.await;
543            // if we receive a message, it indicates we want to stop this task
544            if _res.is_err() {
545                if let Err(e) = conn.sender().close_consumer(consumer_id).await {
546                    error!(
547                        "could not close consumer {:?}({}) for topic {}: {:?}",
548                        consumer_name, consumer_id, topic_name, e
549                    );
550                }
551            }
552        }));
553
554        if unacked_message_redelivery_delay.is_some() {
555            let mut redelivery_tx = engine_tx.clone();
556            let mut interval = client.executor.interval(Duration::from_millis(500));
557            let res = client.executor.spawn(Box::pin(async move {
558                while interval.next().await.is_some() {
559                    if redelivery_tx
560                        .send(EngineMessage::UnackedRedelivery)
561                        .await
562                        .is_err()
563                    {
564                        // Consumer shut down - stop ticker
565                        break;
566                    }
567                }
568            }));
569            if res.is_err() {
570                return Err(Error::Executor);
571            }
572        }
573        let (tx, rx) = mpsc::channel(1000);
574        let mut c = ConsumerEngine::new(
575            client.clone(),
576            connection.clone(),
577            topic.clone(),
578            subscription.clone(),
579            sub_type,
580            consumer_id,
581            name,
582            tx,
583            messages,
584            engine_rx,
585            batch_size,
586            unacked_message_redelivery_delay,
587            dead_letter_policy.clone(),
588            options.clone(),
589            _drop_signal,
590        );
591        let f = async move {
592            c.engine()
593                .map(|res| {
594                    debug!("consumer engine stopped: {:?}", res);
595                })
596                .await;
597        };
598        if client.executor.spawn(Box::pin(f)).is_err() {
599            return Err(Error::Executor);
600        }
601
602        Ok(TopicConsumer {
603            consumer_id,
604            config,
605            topic,
606            messages: Box::pin(rx),
607            engine_tx,
608            data_type: PhantomData,
609            dead_letter_policy,
610            last_message_received: None,
611            messages_received: 0,
612        })
613    }
614
615    pub fn topic(&self) -> String {
616        self.topic.clone()
617    }
618
619    pub async fn connection(&mut self) -> Result<Arc<Connection<Exe>>, Error> {
620        let (resolver, response) = oneshot::channel();
621        self.engine_tx
622            .send(EngineMessage::GetConnection(resolver))
623            .await
624            .map_err(|_| ConsumerError::Connection(ConnectionError::Disconnected))?;
625
626        response.await.map_err(|oneshot::Canceled| {
627            error!("the consumer engine dropped the request");
628            ConnectionError::Disconnected.into()
629        })
630    }
631
632    pub async fn check_connection(&mut self) -> Result<(), Error> {
633        let conn = self.connection().await?;
634        info!("check connection for id {}", conn.id());
635        conn.sender().send_ping().await?;
636        Ok(())
637    }
638
639    pub async fn ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
640        self.engine_tx
641            .send(EngineMessage::Ack(msg.message_id.clone(), false))
642            .await?;
643        Ok(())
644    }
645
646    pub(crate) fn acker(&self) -> mpsc::UnboundedSender<EngineMessage<Exe>> {
647        self.engine_tx.clone()
648    }
649
650    async fn cumulative_ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
651        self.engine_tx
652            .send(EngineMessage::Ack(msg.message_id.clone(), true))
653            .await?;
654        Ok(())
655    }
656
657    async fn nack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
658        self.engine_tx
659            .send(EngineMessage::Nack(msg.message_id.clone()))
660            .await?;
661        Ok(())
662    }
663
664    pub async fn seek(
665        &mut self,
666        message_id: Option<MessageIdData>,
667        timestamp: Option<u64>,
668    ) -> Result<(), Error> {
669        let consumer_id = self.consumer_id;
670        self.connection()
671            .await?
672            .sender()
673            .seek(consumer_id, message_id, timestamp)
674            .await?;
675        Ok(())
676    }
677
678    pub async fn unsubscribe(&mut self) -> Result<(), Error> {
679        let consumer_id = self.consumer_id;
680        self.connection()
681            .await?
682            .sender()
683            .unsubscribe(consumer_id)
684            .await?;
685        Ok(())
686    }
687
688    pub fn last_message_received(&self) -> Option<DateTime<Utc>> {
689        self.last_message_received
690    }
691
692    pub fn messages_received(&self) -> u64 {
693        self.messages_received
694    }
695
696    fn config(&self) -> &ConsumerConfig {
697        &self.config
698    }
699
700    fn create_message(&self, message_id: proto::MessageIdData, payload: Payload) -> Message<T> {
701        Message {
702            topic: self.topic.clone(),
703            message_id: MessageData {
704                id: message_id,
705                batch_size: payload.metadata.num_messages_in_batch,
706            },
707            payload,
708            _phantom: PhantomData,
709        }
710    }
711}
712
713impl<T: DeserializeMessage, Exe: Executor> Stream for TopicConsumer<T, Exe> {
714    type Item = Result<Message<T>, Error>;
715
716    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
717        match self.messages.as_mut().poll_next(cx) {
718            Poll::Pending => Poll::Pending,
719            Poll::Ready(None) => Poll::Ready(None),
720            Poll::Ready(Some(Ok((id, payload)))) => {
721                self.last_message_received = Some(Utc::now());
722                self.messages_received += 1;
723                Poll::Ready(Some(Ok(self.create_message(id, payload))))
724            }
725            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
726        }
727    }
728}
729
730struct ConsumerEngine<Exe: Executor> {
731    client: Pulsar<Exe>,
732    connection: Arc<Connection<Exe>>,
733    topic: String,
734    subscription: String,
735    sub_type: SubType,
736    id: u64,
737    name: Option<String>,
738    tx: mpsc::Sender<Result<(proto::MessageIdData, Payload), Error>>,
739    messages_rx: Option<mpsc::UnboundedReceiver<RawMessage>>,
740    engine_rx: Option<mpsc::UnboundedReceiver<EngineMessage<Exe>>>,
741    batch_size: u32,
742    remaining_messages: u32,
743    unacked_message_redelivery_delay: Option<Duration>,
744    unacked_messages: HashMap<MessageIdData, Instant>,
745    dead_letter_policy: Option<DeadLetterPolicy>,
746    options: ConsumerOptions,
747    _drop_signal: oneshot::Sender<()>,
748}
749
750pub(crate) enum EngineMessage<Exe: Executor> {
751    Ack(MessageData, bool),
752    Nack(MessageData),
753    UnackedRedelivery,
754    GetConnection(oneshot::Sender<Arc<Connection<Exe>>>),
755}
756
757impl<Exe: Executor> ConsumerEngine<Exe> {
758    fn new(
759        client: Pulsar<Exe>,
760        connection: Arc<Connection<Exe>>,
761        topic: String,
762        subscription: String,
763        sub_type: SubType,
764        id: u64,
765        name: Option<String>,
766        tx: mpsc::Sender<Result<(proto::MessageIdData, Payload), Error>>,
767        messages_rx: mpsc::UnboundedReceiver<RawMessage>,
768        engine_rx: mpsc::UnboundedReceiver<EngineMessage<Exe>>,
769        batch_size: u32,
770        unacked_message_redelivery_delay: Option<Duration>,
771        dead_letter_policy: Option<DeadLetterPolicy>,
772        options: ConsumerOptions,
773        _drop_signal: oneshot::Sender<()>,
774    ) -> ConsumerEngine<Exe> {
775        ConsumerEngine {
776            client,
777            connection,
778            topic,
779            subscription,
780            sub_type,
781            id,
782            name,
783            tx,
784            messages_rx: Some(messages_rx),
785            engine_rx: Some(engine_rx),
786            batch_size,
787            remaining_messages: batch_size,
788            unacked_message_redelivery_delay,
789            unacked_messages: HashMap::new(),
790            dead_letter_policy,
791            options,
792            _drop_signal,
793        }
794    }
795
796    async fn engine(&mut self) -> Result<(), Error> {
797        debug!("starting the consumer engine for topic {}", self.topic);
798        let mut messages_or_ack_f = None;
799        loop {
800            if !self.connection.is_valid() {
801                if let Some(err) = self.connection.error() {
802                    error!(
803                        "Consumer: connection {} is not valid: {:?}",
804                        self.connection.id(),
805                        err
806                    );
807                    self.reconnect().await?;
808                }
809            }
810
811            if self.remaining_messages < self.batch_size / 2 {
812                match self
813                    .connection
814                    .sender()
815                    .send_flow(self.id, self.batch_size - self.remaining_messages)
816                {
817                    Ok(()) => {}
818                    Err(ConnectionError::Disconnected) => {
819                        self.reconnect().await?;
820                        self.connection
821                            .sender()
822                            .send_flow(self.id, self.batch_size - self.remaining_messages)?;
823                    }
824                    Err(e) => return Err(e.into()),
825                }
826                self.remaining_messages = self.batch_size;
827            }
828
829            let mut f = match messages_or_ack_f.take() {
830                None => {
831                    // we need these complicated steps to select on two streams of different types,
832                    // while being able to store it in the ConsumerEngine object (biggest issue),
833                    // and replacing messages_rx when we reconnect, and considering that engine_rx is
834                    // not clonable.
835                    // Please, someone find a better solution
836                    let messages_f = self.messages_rx.take().unwrap().into_future();
837                    let ack_f = self.engine_rx.take().unwrap().into_future();
838                    select(messages_f, ack_f)
839                }
840                Some(f) => f,
841            };
842
843            // we want to wake up regularly to check if the connection is still valid:
844            // if the heartbeat failed, the connection.is_valid() call at the beginning
845            // of the loop should fail, but to get there we must stop waiting on
846            // messages_f and ack_f
847            let delay_f = self.client.executor.delay(Duration::from_secs(1));
848            let f_pin = std::pin::Pin::new(&mut f);
849            pin_mut!(delay_f);
850
851            let f = match select(f_pin, delay_f).await {
852                Either::Left((res, _)) => res,
853                Either::Right((_, _f)) => {
854                    messages_or_ack_f = Some(f);
855                    continue;
856                }
857            };
858
859            match f {
860                Either::Left(((message_opt, messages_rx), engine_rx)) => {
861                    self.messages_rx = Some(messages_rx);
862                    self.engine_rx = engine_rx.into_inner();
863                    match message_opt {
864                        None => {
865                            error!("Consumer: messages::next: returning Disconnected");
866                            self.reconnect().await?;
867                            continue;
868                            //return Err(Error::Consumer(ConsumerError::Connection(ConnectionError::Disconnected)).into());
869                        }
870                        Some(message) => {
871                            self.remaining_messages -= message
872                                .payload
873                                .as_ref()
874                                .and_then(|payload| payload.metadata.num_messages_in_batch)
875                                .unwrap_or(1i32)
876                                as u32;
877
878                            match self.process_message(message).await {
879                                // Continue
880                                Ok(true) => {}
881                                // End of Topic
882                                Ok(false) => {
883                                    return Ok(());
884                                }
885                                Err(e) => {
886                                    if let Err(e) = self.tx.send(Err(e)).await {
887                                        error!("cannot send a message from the consumer engine to the consumer({}), stopping the engine", self.id);
888                                        return Err(Error::Consumer(e.into()));
889                                    }
890                                }
891                            }
892                        }
893                    }
894                }
895                Either::Right(((ack_opt, engine_rx), messages_rx)) => {
896                    self.messages_rx = messages_rx.into_inner();
897                    self.engine_rx = Some(engine_rx);
898
899                    match ack_opt {
900                        None => {
901                            trace!("ack channel was closed");
902                            return Ok(());
903                        }
904                        Some(EngineMessage::Ack(message_id, cumulative)) => {
905                            self.ack(message_id, cumulative);
906                        }
907                        Some(EngineMessage::Nack(message_id)) => {
908                            if let Err(e) = self
909                                .connection
910                                .sender()
911                                .send_redeliver_unacknowleged_messages(
912                                    self.id,
913                                    vec![message_id.id.clone()],
914                                )
915                            {
916                                error!(
917                                    "could not ask for redelivery for message {:?}: {:?}",
918                                    message_id, e
919                                );
920                            }
921                        }
922                        Some(EngineMessage::UnackedRedelivery) => {
923                            let mut h = HashSet::new();
924                            let now = Instant::now();
925                            //info!("unacked messages length: {}", self.unacked_messages.len());
926                            for (id, t) in self.unacked_messages.iter() {
927                                if *t < now {
928                                    h.insert(id.clone());
929                                }
930                            }
931
932                            let ids: Vec<_> = h.iter().cloned().collect();
933                            if !ids.is_empty() {
934                                //info!("will unack ids: {:?}", ids);
935                                if let Err(e) = self
936                                    .connection
937                                    .sender()
938                                    .send_redeliver_unacknowleged_messages(self.id, ids)
939                                {
940                                    error!("could not ask for redelivery: {:?}", e);
941                                } else {
942                                    for i in h.iter() {
943                                        self.unacked_messages.remove(i);
944                                    }
945                                }
946                            }
947                        }
948                        Some(EngineMessage::GetConnection(sender)) => {
949                            let _ = sender.send(self.connection.clone()).map_err(|_| {
950                                error!("consumer requested the engine's connection but dropped the channel before receiving");
951                            });
952                        }
953                    }
954                }
955            };
956        }
957    }
958
959    fn ack(&mut self, message_id: MessageData, cumulative: bool) {
960        //FIXME: this does not handle cumulative acks
961        self.unacked_messages.remove(&message_id.id);
962        let res = self
963            .connection
964            .sender()
965            .send_ack(self.id, vec![message_id.id], cumulative);
966        if res.is_err() {
967            error!("ack error: {:?}", res);
968        }
969    }
970
971    /// Process the message. Returns `true` if there are more messages to process
972    async fn process_message(&mut self, message: RawMessage) -> Result<bool, Error> {
973        match message {
974            RawMessage {
975                command:
976                    BaseCommand {
977                        reached_end_of_topic: Some(_),
978                        ..
979                    },
980                ..
981            } => {
982                return Ok(false);
983            }
984            RawMessage {
985                command:
986                    BaseCommand {
987                        active_consumer_change: Some(active_consumer_change),
988                        ..
989                    },
990                ..
991            } => {
992                // TODO: Communicate this status to the Consumer and expose it
993                debug!(
994                    "Active consumer change for {} - Active: {:?}",
995                    self.debug_format(),
996                    active_consumer_change.is_active
997                );
998            }
999            RawMessage {
1000                command:
1001                    BaseCommand {
1002                        message: Some(message),
1003                        ..
1004                    },
1005                payload: Some(payload),
1006            } => {
1007                self.process_payload(message, payload).await?;
1008            }
1009            RawMessage {
1010                command: BaseCommand {
1011                    message: Some(_), ..
1012                },
1013                payload: None,
1014            } => {
1015                error!(
1016                    "Consumer {} received message without payload",
1017                    self.debug_format()
1018                );
1019            }
1020            RawMessage {
1021                command:
1022                    BaseCommand {
1023                        close_consumer: Some(CommandCloseConsumer { consumer_id, .. }),
1024                        ..
1025                    },
1026                ..
1027            } => {
1028                error!(
1029                    "Broker notification of closed consumer {}: {}",
1030                    consumer_id,
1031                    self.debug_format()
1032                );
1033
1034                self.reconnect().await?;
1035            }
1036            unexpected => {
1037                let r#type = proto::base_command::Type::try_from(unexpected.command.r#type)
1038                    .map(|t| format!("{:?}", t))
1039                    .unwrap_or_else(|_| unexpected.command.r#type.to_string());
1040                warn!(
1041                    "Unexpected message type sent to consumer: {}. This is probably a bug!",
1042                    r#type
1043                );
1044            }
1045        }
1046        Ok(true)
1047    }
1048
1049    async fn process_payload(
1050        &mut self,
1051        message: CommandMessage,
1052        mut payload: Payload,
1053    ) -> Result<(), Error> {
1054        let compression = payload.metadata.compression;
1055
1056        let payload = match compression {
1057            None | Some(0) => payload,
1058            // LZ4
1059            Some(1) => {
1060                #[cfg(not(feature = "lz4"))]
1061                {
1062                    return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1063                        std::io::ErrorKind::Other,
1064                        "got a LZ4 compressed message but 'lz4' cargo feature is deactivated",
1065                    )))
1066                    .into());
1067                }
1068
1069                #[cfg(feature = "lz4")]
1070                {
1071                    let decompressed_payload = lz4::block::decompress(
1072                        &payload.data[..],
1073                        payload.metadata.uncompressed_size.map(|i| i as i32),
1074                    )
1075                    .map_err(ConsumerError::Io)?;
1076
1077                    payload.data = decompressed_payload;
1078                    payload
1079                }
1080            }
1081            // zlib
1082            Some(2) => {
1083                #[cfg(not(feature = "flate2"))]
1084                {
1085                    return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1086                        std::io::ErrorKind::Other,
1087                        "got a zlib compressed message but 'flate2' cargo feature is deactivated",
1088                    )))
1089                    .into());
1090                }
1091
1092                #[cfg(feature = "flate2")]
1093                {
1094                    use flate2::read::ZlibDecoder;
1095                    use std::io::Read;
1096
1097                    let mut d = ZlibDecoder::new(&payload.data[..]);
1098                    let mut decompressed_payload = Vec::new();
1099                    d.read_to_end(&mut decompressed_payload)
1100                        .map_err(ConsumerError::Io)?;
1101
1102                    payload.data = decompressed_payload;
1103                    payload
1104                }
1105            }
1106            // zstd
1107            Some(3) => {
1108                #[cfg(not(feature = "zstd"))]
1109                {
1110                    return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1111                        std::io::ErrorKind::Other,
1112                        "got a zstd compressed message but 'zstd' cargo feature is deactivated",
1113                    )))
1114                    .into());
1115                }
1116
1117                #[cfg(feature = "zstd")]
1118                {
1119                    let decompressed_payload =
1120                        zstd::decode_all(&payload.data[..]).map_err(ConsumerError::Io)?;
1121
1122                    payload.data = decompressed_payload;
1123                    payload
1124                }
1125            }
1126            //Snappy
1127            Some(4) => {
1128                #[cfg(not(feature = "snap"))]
1129                {
1130                    return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1131                        std::io::ErrorKind::Other,
1132                        "got a Snappy compressed message but 'snap' cargo feature is deactivated",
1133                    )))
1134                    .into());
1135                }
1136
1137                #[cfg(feature = "snap")]
1138                {
1139                    use std::io::Read;
1140
1141                    let mut decompressed_payload = Vec::new();
1142                    let mut decoder = snap::read::FrameDecoder::new(&payload.data[..]);
1143                    decoder
1144                        .read_to_end(&mut decompressed_payload)
1145                        .map_err(ConsumerError::Io)?;
1146
1147                    payload.data = decompressed_payload;
1148                    payload
1149                }
1150            }
1151            Some(i) => {
1152                error!("unknown compression type: {}", i);
1153                return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
1154                    std::io::ErrorKind::Other,
1155                    format!("unknown compression type: {}", i),
1156                ))));
1157            }
1158        };
1159
1160        match payload.metadata.num_messages_in_batch {
1161            Some(_) => {
1162                let it = BatchedMessageIterator::new(message.message_id, payload)?;
1163                for (id, payload) in it {
1164                    // TODO: Dead letter policy for batched messages
1165                    self.send_to_consumer(id, payload).await?;
1166                }
1167            }
1168            None => match (message.redelivery_count, self.dead_letter_policy.as_ref()) {
1169                (Some(redelivery_count), Some(dead_letter_policy)) => {
1170                    // Send message to Dead Letter Topic and ack message in original topic
1171                    if redelivery_count as usize >= dead_letter_policy.max_redeliver_count {
1172                        self.client
1173                            .send(&dead_letter_policy.dead_letter_topic, payload.data)
1174                            .await?
1175                            .await
1176                            .map_err(|e| {
1177                                error!("One shot cancelled {:?}", e);
1178                                Error::Custom("DLQ send error".to_string())
1179                            })?;
1180
1181                        self.ack(
1182                            MessageData {
1183                                id: message.message_id,
1184                                batch_size: None,
1185                            },
1186                            false,
1187                        );
1188                    } else {
1189                        self.send_to_consumer(message.message_id, payload).await?
1190                    }
1191                }
1192                _ => self.send_to_consumer(message.message_id, payload).await?,
1193            },
1194        }
1195        Ok(())
1196    }
1197
1198    async fn send_to_consumer(
1199        &mut self,
1200        message_id: MessageIdData,
1201        payload: Payload,
1202    ) -> Result<(), Error> {
1203        let now = Instant::now();
1204        self.tx
1205            .send(Ok((message_id.clone(), payload)))
1206            .await
1207            .map_err(|e| {
1208                error!("tx returned {:?}", e);
1209                Error::Custom("tx closed".to_string())
1210            })?;
1211        if let Some(duration) = self.unacked_message_redelivery_delay {
1212            self.unacked_messages.insert(message_id, now + duration);
1213        }
1214        Ok(())
1215    }
1216
1217    async fn reconnect(&mut self) -> Result<(), Error> {
1218        debug!("reconnecting consumer for topic: {}", self.topic);
1219        let broker_address = self.client.lookup_topic(&self.topic).await?;
1220        let conn = self.client.manager.get_connection(&broker_address).await?;
1221
1222        self.connection = conn;
1223
1224        let topic = self.topic.clone();
1225        let (resolver, messages) = mpsc::unbounded();
1226
1227        self.connection
1228            .sender()
1229            .subscribe(
1230                resolver,
1231                topic.clone(),
1232                self.subscription.clone(),
1233                self.sub_type,
1234                self.id,
1235                self.name.clone(),
1236                self.options.clone(),
1237            )
1238            .await
1239            .map_err(Error::Connection)?;
1240
1241        self.connection
1242            .sender()
1243            .send_flow(self.id, self.batch_size)
1244            .map_err(|e| Error::Consumer(ConsumerError::Connection(e)))?;
1245
1246        self.messages_rx = Some(messages);
1247
1248        // drop_signal will be dropped when Consumer is dropped, then
1249        // drop_receiver will return, and we can close the consumer
1250        let (_drop_signal, drop_receiver) = oneshot::channel::<()>();
1251        let conn = self.connection.clone();
1252        let name = self.name.clone();
1253        let id = self.id;
1254        let topic = self.topic.clone();
1255        let _ = self.client.executor.spawn(Box::pin(async move {
1256            let _res = drop_receiver.await;
1257            // if we receive a message, it indicates we want to stop this task
1258            if _res.is_err() {
1259                if let Err(e) = conn.sender().close_consumer(id).await {
1260                    error!(
1261                        "could not close consumer {:?}({}) for topic {}: {:?}",
1262                        name, id, topic, e
1263                    );
1264                }
1265            }
1266        }));
1267        let old_signal = std::mem::replace(&mut self._drop_signal, _drop_signal);
1268        if let Err(e) = old_signal.send(()) {
1269            error!(
1270                "could not send the drop signal to the old consumer(id={}): {:?}",
1271                id, e
1272            );
1273        }
1274
1275        Ok(())
1276    }
1277
1278    fn debug_format(&self) -> String {
1279        format!(
1280            "[{id} - {subscription}{name}: {topic}]",
1281            id = self.id,
1282            subscription = &self.subscription,
1283            name = self
1284                .name
1285                .as_ref()
1286                .map(|s| format!("({})", s))
1287                .unwrap_or_default(),
1288            topic = &self.topic
1289        )
1290    }
1291}
1292
1293#[derive(Clone, Debug, PartialEq)]
1294pub struct MessageData {
1295    pub id: proto::MessageIdData,
1296    batch_size: Option<i32>,
1297}
1298
1299struct BatchedMessageIterator {
1300    messages: std::vec::IntoIter<BatchedMessage>,
1301    message_id: proto::MessageIdData,
1302    metadata: Metadata,
1303    total_messages: u32,
1304    current_index: u32,
1305}
1306
1307impl BatchedMessageIterator {
1308    fn new(message_id: proto::MessageIdData, payload: Payload) -> Result<Self, ConnectionError> {
1309        let total_messages = payload
1310            .metadata
1311            .num_messages_in_batch
1312            .expect("expected batched message") as u32;
1313        let messages = parse_batched_message(total_messages, &payload.data)?;
1314
1315        Ok(Self {
1316            messages: messages.into_iter(),
1317            message_id,
1318            total_messages,
1319            metadata: payload.metadata,
1320            current_index: 0,
1321        })
1322    }
1323}
1324
1325impl Iterator for BatchedMessageIterator {
1326    type Item = (proto::MessageIdData, Payload);
1327
1328    fn next(&mut self) -> Option<Self::Item> {
1329        let remaining = self.total_messages - self.current_index;
1330        if remaining == 0 {
1331            return None;
1332        }
1333        let index = self.current_index;
1334        self.current_index += 1;
1335        if let Some(batched_message) = self.messages.next() {
1336            let id = proto::MessageIdData {
1337                batch_index: Some(index as i32),
1338                ..self.message_id.clone()
1339            };
1340
1341            let metadata = Metadata {
1342                properties: batched_message.metadata.properties,
1343                partition_key: batched_message.metadata.partition_key,
1344                event_time: batched_message.metadata.event_time,
1345                ..self.metadata.clone()
1346            };
1347
1348            let payload = Payload {
1349                metadata,
1350                data: batched_message.payload,
1351            };
1352
1353            Some((id, payload))
1354        } else {
1355            None
1356        }
1357    }
1358}
1359
1360/// Builder structure for consumers
1361///
1362/// This is the main way to create a [Consumer] or a [Reader]
1363#[derive(Clone)]
1364pub struct ConsumerBuilder<Exe: Executor> {
1365    pulsar: Pulsar<Exe>,
1366    topics: Option<Vec<String>>,
1367    topic_regex: Option<Regex>,
1368    subscription: Option<String>,
1369    subscription_type: Option<SubType>,
1370    consumer_id: Option<u64>,
1371    consumer_name: Option<String>,
1372    batch_size: Option<u32>,
1373    unacked_message_resend_delay: Option<Duration>,
1374    dead_letter_policy: Option<DeadLetterPolicy>,
1375    consumer_options: Option<ConsumerOptions>,
1376    namespace: Option<String>,
1377    topic_refresh: Option<Duration>,
1378}
1379
1380impl<Exe: Executor> ConsumerBuilder<Exe> {
1381    /// Creates a new [ConsumerBuilder] from an existing client instance
1382    pub fn new(pulsar: &Pulsar<Exe>) -> Self {
1383        ConsumerBuilder {
1384            pulsar: pulsar.clone(),
1385            topics: None,
1386            topic_regex: None,
1387            subscription: None,
1388            subscription_type: None,
1389            consumer_id: None,
1390            consumer_name: None,
1391            batch_size: None,
1392            //TODO what should this default to? None seems incorrect..
1393            unacked_message_resend_delay: None,
1394            dead_letter_policy: None,
1395            consumer_options: None,
1396            namespace: None,
1397            topic_refresh: None,
1398        }
1399    }
1400
1401    /// sets the consumer's topic or add one to the list of topics
1402    pub fn with_topic<S: Into<String>>(mut self, topic: S) -> ConsumerBuilder<Exe> {
1403        match &mut self.topics {
1404            Some(topics) => topics.push(topic.into()),
1405            None => self.topics = Some(vec![topic.into()]),
1406        }
1407        self
1408    }
1409
1410    /// adds a list of topics to the future consumer
1411    pub fn with_topics<S: AsRef<str>, I: IntoIterator<Item = S>>(
1412        mut self,
1413        topics: I,
1414    ) -> ConsumerBuilder<Exe> {
1415        let new_topics = topics.into_iter().map(|t| t.as_ref().into());
1416        match &mut self.topics {
1417            Some(topics) => {
1418                topics.extend(new_topics);
1419            }
1420            None => self.topics = Some(new_topics.collect()),
1421        }
1422        self
1423    }
1424
1425    /// sets up a consumer that will listen on all topics matching the regular
1426    /// expression
1427    pub fn with_topic_regex(mut self, regex: Regex) -> ConsumerBuilder<Exe> {
1428        self.topic_regex = Some(regex);
1429        self
1430    }
1431
1432    /// sets the subscription's name
1433    pub fn with_subscription<S: Into<String>>(mut self, subscription: S) -> Self {
1434        self.subscription = Some(subscription.into());
1435        self
1436    }
1437
1438    /// sets the kind of subscription
1439    pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
1440        self.subscription_type = Some(subscription_type);
1441        self
1442    }
1443
1444    /// Tenant/Namespace to be used when matching against a regex. For other consumers,
1445    /// specify namespace using the `<persistent|non-persistent://<tenant>/<namespace>/<topic>`
1446    /// topic format.
1447    /// Defaults to `public/default` if not specifid
1448    pub fn with_lookup_namespace<S: Into<String>>(mut self, namespace: S) -> Self {
1449        self.namespace = Some(namespace.into());
1450        self
1451    }
1452
1453    /// Interval for refreshing the topics when using a topic regex. Unused otherwise.
1454    pub fn with_topic_refresh(mut self, refresh_interval: Duration) -> Self {
1455        self.topic_refresh = Some(refresh_interval);
1456        self
1457    }
1458
1459    /// sets the consumer id for this consumer
1460    pub fn with_consumer_id(mut self, consumer_id: u64) -> Self {
1461        self.consumer_id = Some(consumer_id);
1462        self
1463    }
1464
1465    /// sets the consumer's name
1466    pub fn with_consumer_name<S: Into<String>>(mut self, consumer_name: S) -> Self {
1467        self.consumer_name = Some(consumer_name.into());
1468        self
1469    }
1470
1471    /// sets the batch size
1472    ///
1473    /// batch messages containing more than the configured batch size will
1474    /// not be sent by Pulsar
1475    ///
1476    /// default value: 1000
1477    pub fn with_batch_size(mut self, batch_size: u32) -> Self {
1478        self.batch_size = Some(batch_size);
1479        self
1480    }
1481
1482    /// sets consumer options
1483    pub fn with_options(mut self, options: ConsumerOptions) -> Self {
1484        self.consumer_options = Some(options);
1485        self
1486    }
1487
1488    /// sets the dead letter policy
1489    pub fn with_dead_letter_policy(mut self, dead_letter_policy: DeadLetterPolicy) -> Self {
1490        self.dead_letter_policy = Some(dead_letter_policy);
1491        self
1492    }
1493
1494    /// The time after which a message is dropped without being acknowledged or nacked
1495    /// that the message is resent. If `None`, messages will only be resent when a
1496    /// consumer disconnects with pending unacknowledged messages.
1497    pub fn with_unacked_message_resend_delay(mut self, delay: Option<Duration>) -> Self {
1498        self.unacked_message_resend_delay = delay;
1499        self
1500    }
1501
1502    // Checks the builder for inconsistencies
1503    // returns a config and a list of topics with associated brokers
1504    async fn validate<T: DeserializeMessage>(
1505        self,
1506    ) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> {
1507        let ConsumerBuilder {
1508            pulsar,
1509            topics,
1510            topic_regex,
1511            subscription,
1512            subscription_type,
1513            consumer_id,
1514            mut consumer_name,
1515            batch_size,
1516            unacked_message_resend_delay,
1517            consumer_options,
1518            dead_letter_policy,
1519            namespace: _,
1520            topic_refresh: _,
1521        } = self;
1522
1523        if consumer_name.is_none() {
1524            let s: String = (0..8)
1525                .map(|_| rand::thread_rng().sample(Alphanumeric))
1526                .map(|c| c as char)
1527                .collect();
1528            consumer_name = Some(format!("consumer_{}", s));
1529        }
1530
1531        if topics.is_none() && topic_regex.is_none() {
1532            return Err(Error::Custom(
1533                "Cannot create consumer with no topics and no topic regex".into(),
1534            ));
1535        }
1536
1537        let topics: Vec<(String, BrokerAddress)> = try_join_all(
1538            topics
1539                .into_iter()
1540                .flatten()
1541                .map(|topic| pulsar.lookup_partitioned_topic(topic)),
1542        )
1543        .await?
1544        .into_iter()
1545        .flatten()
1546        .collect();
1547
1548        if topics.is_empty() && topic_regex.is_none() {
1549            return Err(Error::Custom(
1550                "Unable to create consumer - topic not found".to_string(),
1551            ));
1552        }
1553
1554        let consumer_id = match (consumer_id, topics.len()) {
1555            (Some(consumer_id), 1) => Some(consumer_id),
1556            (Some(_), _) => {
1557                warn!("Cannot specify consumer id for connecting to partitioned topics or multiple topics");
1558                None
1559            }
1560            _ => None,
1561        };
1562        let subscription = subscription.unwrap_or_else(|| {
1563            let s: String = (0..8)
1564                .map(|_| rand::thread_rng().sample(Alphanumeric))
1565                .map(|c| c as char)
1566                .collect();
1567            let subscription = format!("sub_{}", s);
1568            warn!(
1569                "Subscription not specified. Using new subscription `{}`.",
1570                subscription
1571            );
1572            subscription
1573        });
1574        let sub_type = subscription_type.unwrap_or_else(|| {
1575            warn!("Subscription Type not specified. Defaulting to `Shared`.");
1576            SubType::Shared
1577        });
1578
1579        let config = ConsumerConfig {
1580            subscription,
1581            sub_type,
1582            batch_size,
1583            consumer_name,
1584            consumer_id,
1585            unacked_message_redelivery_delay: unacked_message_resend_delay,
1586            options: consumer_options.unwrap_or_default(),
1587            dead_letter_policy,
1588        };
1589        Ok((config, topics))
1590    }
1591
1592    /// creates a [Consumer] from this builder
1593    pub async fn build<T: DeserializeMessage>(self) -> Result<Consumer<T, Exe>, Error> {
1594        // would this clone() consume too much memory?
1595        let (config, joined_topics) = self.clone().validate::<T>().await?;
1596
1597        let consumers = try_join_all(joined_topics.into_iter().map(|(topic, addr)| {
1598            TopicConsumer::new(self.pulsar.clone(), topic, addr, config.clone())
1599        }))
1600        .await?;
1601
1602        let consumer = if consumers.len() == 1 {
1603            let consumer = consumers.into_iter().next().unwrap();
1604            InnerConsumer::Single(consumer)
1605        } else {
1606            let consumers: BTreeMap<_, _> = consumers
1607                .into_iter()
1608                .map(|c| (c.topic(), Box::pin(c)))
1609                .collect();
1610            let topics = consumers.keys().cloned().collect();
1611            let topic_refresh = self
1612                .topic_refresh
1613                .unwrap_or_else(|| Duration::from_secs(30));
1614            let refresh = Box::pin(self.pulsar.executor.interval(topic_refresh).map(drop));
1615            let mut consumer = MultiTopicConsumer {
1616                namespace: self
1617                    .namespace
1618                    .unwrap_or_else(|| "public/default".to_string()),
1619                topic_regex: self.topic_regex,
1620                pulsar: self.pulsar,
1621                consumers,
1622                topics,
1623                new_consumers: None,
1624                refresh,
1625                config,
1626                disc_last_message_received: None,
1627                disc_messages_received: 0,
1628            };
1629            if consumer.topic_regex.is_some() {
1630                consumer.update_topics();
1631                let initial_consumers = consumer.new_consumers.take().unwrap().await?;
1632                consumer.add_consumers(initial_consumers);
1633            }
1634            InnerConsumer::Multi(consumer)
1635        };
1636        Ok(Consumer { inner: consumer })
1637    }
1638
1639    /// creates a [Reader] from this builder
1640    pub async fn into_reader<T: DeserializeMessage>(self) -> Result<Reader<T, Exe>, Error> {
1641        // would this clone() consume too much memory?
1642        let (mut config, mut joined_topics) = self.clone().validate::<T>().await?;
1643
1644        // the validate() function defaults sub_type to SubType::Shared,
1645        // but a reader's subscription is exclusive
1646        warn!("Subscription Type for a reader is `Exclusive`. Resetting.");
1647        config.sub_type = SubType::Exclusive;
1648
1649        if self.topics.unwrap().len() > 1 {
1650            return Err(Error::Custom(
1651                "Unable to create a reader - one topic max".to_string(),
1652            ));
1653        }
1654
1655        let (topic, addr) = joined_topics.pop().unwrap();
1656        let consumer = TopicConsumer::new(self.pulsar.clone(), topic, addr, config.clone()).await?;
1657
1658        Ok(Reader {
1659            consumer,
1660            state: Some(State::PollingConsumer),
1661        })
1662    }
1663}
1664
1665/// the complete configuration of a consumer
1666#[derive(Debug, Clone, Default)]
1667pub(crate) struct ConsumerConfig {
1668    /// subscription name
1669    pub(crate) subscription: String,
1670    /// subscription type
1671    ///
1672    /// default: Shared
1673    pub(crate) sub_type: SubType,
1674    /// maximum size for batched messages
1675    ///
1676    /// default: 1000
1677    pub(crate) batch_size: Option<u32>,
1678    /// name of the consumer
1679    pub(crate) consumer_name: Option<String>,
1680    /// numerical id of the consumer
1681    consumer_id: Option<u64>,
1682    /// time after which unacked messages will be sent again
1683    unacked_message_redelivery_delay: Option<Duration>,
1684    /// consumer options
1685    pub(crate) options: ConsumerOptions,
1686    /// dead letter policy
1687    dead_letter_policy: Option<DeadLetterPolicy>,
1688}
1689
1690/// A consumer that can subscribe on multiple topics, from a regex matching topic names
1691struct MultiTopicConsumer<T: DeserializeMessage, Exe: Executor> {
1692    namespace: String,
1693    topic_regex: Option<Regex>,
1694    pulsar: Pulsar<Exe>,
1695    consumers: BTreeMap<String, Pin<Box<TopicConsumer<T, Exe>>>>,
1696    topics: VecDeque<String>,
1697    #[allow(clippy::type_complexity)]
1698    new_consumers:
1699        Option<Pin<Box<dyn Future<Output = Result<Vec<TopicConsumer<T, Exe>>, Error>> + Send>>>,
1700    refresh: Pin<Box<dyn Stream<Item = ()> + Send>>,
1701    config: ConsumerConfig,
1702    // Stats on disconnected consumers to keep metrics correct
1703    disc_messages_received: u64,
1704    disc_last_message_received: Option<DateTime<Utc>>,
1705}
1706
1707impl<T: DeserializeMessage, Exe: Executor> MultiTopicConsumer<T, Exe> {
1708    fn topics(&self) -> Vec<String> {
1709        self.topics.iter().map(|s| s.to_string()).collect()
1710    }
1711
1712    fn last_message_received(&self) -> Option<DateTime<Utc>> {
1713        self.consumers
1714            .values()
1715            .filter_map(|c| c.last_message_received)
1716            .chain(self.disc_last_message_received)
1717            .max()
1718    }
1719
1720    fn messages_received(&self) -> u64 {
1721        self.consumers
1722            .values()
1723            .map(|c| c.messages_received)
1724            .chain(iter::once(self.disc_messages_received))
1725            .sum()
1726    }
1727
1728    async fn check_connections(&mut self) -> Result<(), Error> {
1729        self.pulsar
1730            .manager
1731            .get_base_connection()
1732            .await?
1733            .sender()
1734            .send_ping()
1735            .await?;
1736
1737        for consumer in self.consumers.values_mut() {
1738            consumer.connection().await?.sender().send_ping().await?;
1739        }
1740
1741        Ok(())
1742    }
1743
1744    async fn unsubscribe(&mut self) -> Result<(), Error> {
1745        for consumer in self.consumers.values_mut() {
1746            consumer.unsubscribe().await?;
1747        }
1748
1749        Ok(())
1750    }
1751
1752    fn add_consumers<I: IntoIterator<Item = TopicConsumer<T, Exe>>>(&mut self, consumers: I) {
1753        for consumer in consumers {
1754            let topic = consumer.topic().to_owned();
1755            self.consumers.insert(topic.clone(), Box::pin(consumer));
1756            self.topics.push_back(topic);
1757        }
1758    }
1759
1760    fn remove_consumers(&mut self, topics: &[String]) {
1761        self.topics.retain(|t| !topics.contains(t));
1762        for topic in topics {
1763            if let Some(consumer) = self.consumers.remove(topic) {
1764                self.disc_messages_received += consumer.messages_received;
1765                self.disc_last_message_received = self
1766                    .disc_last_message_received
1767                    .into_iter()
1768                    .chain(consumer.last_message_received)
1769                    .max();
1770            }
1771        }
1772    }
1773
1774    fn update_topics(&mut self) {
1775        if let Some(regex) = self.topic_regex.clone() {
1776            let pulsar = self.pulsar.clone();
1777            let namespace = self.namespace.clone();
1778            let existing_topics: BTreeSet<String> = self.consumers.keys().cloned().collect();
1779            let consumer_config = self.config.clone();
1780
1781            self.new_consumers = Some(Box::pin(async move {
1782                let topics = pulsar
1783                    .get_topics_of_namespace(
1784                        namespace.clone(),
1785                        proto::command_get_topics_of_namespace::Mode::All,
1786                    )
1787                    .await?;
1788                trace!("fetched topics {:?}", topics);
1789
1790                let topics: Vec<_> = try_join_all(
1791                    topics
1792                        .into_iter()
1793                        .filter(|t| regex.is_match(t))
1794                        .map(|topic| pulsar.lookup_partitioned_topic(topic)),
1795                )
1796                .await?
1797                .into_iter()
1798                .flatten()
1799                .collect();
1800
1801                trace!("matched topics {:?} (regex: {})", topics, &regex);
1802
1803                let consumers = try_join_all(
1804                    topics
1805                        .into_iter()
1806                        .filter(|(t, _)| !existing_topics.contains(t))
1807                        .map(|(topic, addr)| {
1808                            TopicConsumer::new(pulsar.clone(), topic, addr, consumer_config.clone())
1809                        }),
1810                )
1811                .await?;
1812                trace!("created {} consumers", consumers.len());
1813                Ok(consumers)
1814            }));
1815        }
1816    }
1817
1818    async fn ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
1819        if let Some(c) = self.consumers.get_mut(&msg.topic) {
1820            c.ack(msg).await
1821        } else {
1822            Err(ConnectionError::Unexpected(format!("no consumer for topic {}", msg.topic)).into())
1823        }
1824    }
1825
1826    async fn cumulative_ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
1827        if let Some(c) = self.consumers.get_mut(&msg.topic) {
1828            c.cumulative_ack(msg).await
1829        } else {
1830            Err(ConnectionError::Unexpected(format!("no consumer for topic {}", msg.topic)).into())
1831        }
1832    }
1833
1834    async fn nack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
1835        if let Some(c) = self.consumers.get_mut(&msg.topic) {
1836            c.nack(msg).await?;
1837            Ok(())
1838        } else {
1839            Err(ConnectionError::Unexpected(format!("no consumer for topic {}", msg.topic)).into())
1840        }
1841    }
1842
1843    /// Assume that this seek method will call seek for the topics given in the consumer_ids
1844    async fn seek(
1845        &mut self,
1846        consumer_ids: Option<Vec<String>>,
1847        message_id: Option<MessageIdData>,
1848        timestamp: Option<u64>,
1849    ) -> Result<(), Error> {
1850        // 0. null or empty vector
1851        match consumer_ids {
1852            Some(consumer_ids) => {
1853                // 1, select consumers
1854                let mut actions = Vec::default();
1855                for (consumer_id, consumer) in self.consumers.iter_mut() {
1856                    if consumer_ids.contains(consumer_id) {
1857                        actions.push(consumer.seek(message_id.clone(), timestamp));
1858                    }
1859                }
1860                // 2 join all the futures
1861                let mut v = futures::future::join_all(actions).await;
1862
1863                for res in v.drain(..) {
1864                    if res.is_err() {
1865                        return res;
1866                    }
1867                }
1868
1869                Ok(())
1870            }
1871            None => Err(ConnectionError::Unexpected(format!(
1872                "no consumer for consumer ids {:?}",
1873                consumer_ids
1874            ))
1875            .into()),
1876        }
1877    }
1878
1879    fn config(&self) -> &ConsumerConfig {
1880        &self.config
1881    }
1882}
1883
1884/// a message received by a consumer
1885///
1886/// it is generic over the type it can be deserialized to
1887pub struct Message<T> {
1888    /// origin topic of the message
1889    pub topic: String,
1890    /// contains the message's data and other metadata
1891    pub payload: Payload,
1892    /// contains the message's id and batch size data
1893    pub message_id: MessageData,
1894    _phantom: PhantomData<T>,
1895}
1896
1897impl<T> Message<T> {
1898    /// Pulsar metadata for the message
1899    pub fn metadata(&self) -> &MessageMetadata {
1900        &self.payload.metadata
1901    }
1902
1903    /// Get Pulsar message id for the message
1904    pub fn message_id(&self) -> &proto::MessageIdData {
1905        &self.message_id.id
1906    }
1907
1908    /// Get message key (partition key)
1909    pub fn key(&self) -> Option<String> {
1910        self.payload.metadata.partition_key.clone()
1911    }
1912}
1913impl<T: DeserializeMessage> Message<T> {
1914    /// directly deserialize a message
1915    pub fn deserialize(&self) -> T::Output {
1916        T::deserialize_message(&self.payload)
1917    }
1918}
1919
1920impl<T: DeserializeMessage, Exe: Executor> Debug for MultiTopicConsumer<T, Exe> {
1921    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1922        write!(
1923            f,
1924            "MultiTopicConsumer({:?}, {:?})",
1925            &self.namespace, &self.topic_regex
1926        )
1927    }
1928}
1929
1930impl<T: DeserializeMessage, Exe: Executor> Stream for MultiTopicConsumer<T, Exe> {
1931    type Item = Result<Message<T>, Error>;
1932
1933    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1934        if let Some(mut new_consumers) = self.new_consumers.take() {
1935            match new_consumers.as_mut().poll(cx) {
1936                Poll::Ready(Ok(new_consumers)) => {
1937                    self.add_consumers(new_consumers);
1938                }
1939                Poll::Pending => {
1940                    self.new_consumers = Some(new_consumers);
1941                }
1942                Poll::Ready(Err(e)) => {
1943                    error!("Error creating pulsar consumers: {}", e);
1944                    // don't return error here; could be intermittent connection failure and we want
1945                    // to retry
1946                }
1947            }
1948        }
1949
1950        if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) {
1951            self.update_topics();
1952            return self.poll_next(cx);
1953        }
1954
1955        let mut topics_to_remove = Vec::new();
1956        let mut result = None;
1957        for _ in 0..self.topics.len() {
1958            if result.is_some() {
1959                break;
1960            }
1961            let topic = self.topics.pop_front().unwrap();
1962            if let Some(item) = self
1963                .consumers
1964                .get_mut(&topic)
1965                .map(|c| c.as_mut().poll_next(cx))
1966            {
1967                match item {
1968                    Poll::Pending => {}
1969                    Poll::Ready(Some(Ok(msg))) => result = Some(msg),
1970                    Poll::Ready(None) => {
1971                        error!("Unexpected end of stream for pulsar topic {}", &topic);
1972                        topics_to_remove.push(topic.clone());
1973                    }
1974                    Poll::Ready(Some(Err(e))) => {
1975                        error!(
1976                            "Unexpected error consuming from pulsar topic {}: {}",
1977                            &topic, e
1978                        );
1979                        topics_to_remove.push(topic.clone());
1980                    }
1981                }
1982            } else {
1983                eprintln!("BUG: Missing consumer for topic {}", &topic);
1984            }
1985            self.topics.push_back(topic);
1986        }
1987        self.remove_consumers(&topics_to_remove);
1988        if let Some(result) = result {
1989            return Poll::Ready(Some(Ok(result)));
1990        }
1991
1992        Poll::Pending
1993    }
1994}
1995
1996#[cfg(test)]
1997mod tests {
1998    use std::time::{SystemTime, UNIX_EPOCH};
1999
2000    use futures::{StreamExt, TryStreamExt};
2001    use log::LevelFilter;
2002    use regex::Regex;
2003    #[cfg(feature = "tokio-runtime")]
2004    use tokio::time::timeout;
2005
2006    #[cfg(feature = "tokio-runtime")]
2007    use crate::executor::TokioExecutor;
2008    use crate::{producer, tests::TEST_LOGGER, Pulsar, SerializeMessage};
2009
2010    use super::*;
2011    use futures::future::{select, Either};
2012
2013    #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
2014    pub struct TestData {
2015        topic: String,
2016        msg: u32,
2017    }
2018
2019    impl<'a> SerializeMessage for &'a TestData {
2020        fn serialize_message(input: Self) -> Result<producer::Message, Error> {
2021            let payload = serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?;
2022            Ok(producer::Message {
2023                payload,
2024                ..Default::default()
2025            })
2026        }
2027    }
2028
2029    impl DeserializeMessage for TestData {
2030        type Output = Result<TestData, serde_json::Error>;
2031
2032        fn deserialize_message(payload: &Payload) -> Self::Output {
2033            serde_json::from_slice(&payload.data)
2034        }
2035    }
2036
2037    pub static MULTI_LOGGER: crate::tests::SimpleLogger = crate::tests::SimpleLogger {
2038        tag: "multi_consumer",
2039    };
2040    #[tokio::test]
2041    #[cfg(feature = "tokio-runtime")]
2042    async fn multi_consumer() {
2043        let _ = log::set_logger(&MULTI_LOGGER);
2044        let _ = log::set_max_level(LevelFilter::Debug);
2045        let addr = "pulsar://127.0.0.1:6650";
2046
2047        let topic_n: u16 = rand::random();
2048        let topic1 = format!("multi_consumer_a_{}", topic_n);
2049        let topic2 = format!("multi_consumer_b_{}", topic_n);
2050
2051        let data1 = TestData {
2052            topic: "a".to_owned(),
2053            msg: 1,
2054        };
2055        let data2 = TestData {
2056            topic: "a".to_owned(),
2057            msg: 2,
2058        };
2059        let data3 = TestData {
2060            topic: "b".to_owned(),
2061            msg: 3,
2062        };
2063        let data4 = TestData {
2064            topic: "b".to_owned(),
2065            msg: 4,
2066        };
2067
2068        let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2069
2070        try_join_all(vec![
2071            client.send(&topic1, &data1),
2072            client.send(&topic1, &data2),
2073            client.send(&topic2, &data3),
2074            client.send(&topic2, &data4),
2075        ])
2076        .await
2077        .unwrap();
2078
2079        let builder = client
2080            .consumer()
2081            .with_subscription_type(SubType::Shared)
2082            // get earliest messages
2083            .with_options(ConsumerOptions {
2084                initial_position: InitialPosition::Earliest,
2085                ..Default::default()
2086            });
2087
2088        let consumer_1: Consumer<TestData, _> = builder
2089            .clone()
2090            .with_subscription("consumer_1")
2091            .with_topics(&[&topic1, &topic2])
2092            .build()
2093            .await
2094            .unwrap();
2095
2096        let consumer_2: Consumer<TestData, _> = builder
2097            .with_subscription("consumer_2")
2098            .with_topic_regex(Regex::new(&format!("multi_consumer_[ab]_{}", topic_n)).unwrap())
2099            .build()
2100            .await
2101            .unwrap();
2102
2103        let expected: HashSet<_> = vec![data1, data2, data3, data4].into_iter().collect();
2104        for consumer in [consumer_1, consumer_2].iter_mut() {
2105            let connected_topics = consumer.topics();
2106            debug!(
2107                "connected topics for {}: {:?}",
2108                consumer.subscription(),
2109                &connected_topics
2110            );
2111            assert_eq!(connected_topics.len(), 2);
2112            assert!(connected_topics.iter().any(|t| t.ends_with(&topic1)));
2113            assert!(connected_topics.iter().any(|t| t.ends_with(&topic2)));
2114
2115            let mut received = HashSet::new();
2116            while let Some(message) = timeout(Duration::from_secs(1), consumer.next())
2117                .await
2118                .unwrap()
2119            {
2120                received.insert(message.unwrap().deserialize().unwrap());
2121                if received.len() == 4 {
2122                    break;
2123                }
2124            }
2125            assert_eq!(expected, received);
2126            assert_eq!(consumer.messages_received(), 4);
2127            assert!(consumer.last_message_received().is_some());
2128        }
2129    }
2130
2131    #[tokio::test]
2132    #[cfg(feature = "tokio-runtime")]
2133    async fn consumer_dropped_with_lingering_acks() {
2134        use rand::{distributions::Alphanumeric, Rng};
2135        let _ = log::set_logger(&TEST_LOGGER);
2136        let _ = log::set_max_level(LevelFilter::Debug);
2137        let addr = "pulsar://127.0.0.1:6650";
2138
2139        let topic = format!(
2140            "consumer_dropped_with_lingering_acks_{}",
2141            rand::random::<u16>()
2142        );
2143
2144        let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2145
2146        let message = TestData {
2147            topic: std::iter::repeat(())
2148                .map(|()| rand::thread_rng().sample(Alphanumeric) as char)
2149                .take(8)
2150                .map(|c| c as char)
2151                .collect(),
2152            msg: 1,
2153        };
2154
2155        client.send(&topic, &message).await.unwrap().await.unwrap();
2156        println!("producer sends done");
2157
2158        {
2159            println!("creating consumer");
2160            let mut consumer: Consumer<TestData, _> = client
2161                .consumer()
2162                .with_topic(&topic)
2163                .with_subscription("dropped_ack")
2164                .with_subscription_type(SubType::Shared)
2165                // get earliest messages
2166                .with_options(ConsumerOptions {
2167                    initial_position: InitialPosition::Earliest,
2168                    ..Default::default()
2169                })
2170                .build()
2171                .await
2172                .unwrap();
2173
2174            println!("created consumer");
2175
2176            //consumer.next().await
2177            let msg: Message<TestData> = timeout(Duration::from_secs(1), consumer.next())
2178                .await
2179                .unwrap()
2180                .unwrap()
2181                .unwrap();
2182            println!("got message: {:?}", msg.payload);
2183            assert_eq!(
2184                message,
2185                msg.deserialize().unwrap(),
2186                "we probably receive a message from a previous run of the test"
2187            );
2188            consumer.ack(&msg).await.unwrap();
2189        }
2190
2191        {
2192            println!("creating second consumer. The message should have been acked");
2193            let mut consumer: Consumer<TestData, _> = client
2194                .consumer()
2195                .with_topic(&topic)
2196                .with_subscription("dropped_ack")
2197                .with_subscription_type(SubType::Shared)
2198                .with_options(ConsumerOptions {
2199                    initial_position: InitialPosition::Earliest,
2200                    ..Default::default()
2201                })
2202                .build()
2203                .await
2204                .unwrap();
2205
2206            println!("created second consumer");
2207
2208            // the message has already been acked, so we should not receive anything
2209            let res: Result<_, tokio::time::error::Elapsed> =
2210                tokio::time::timeout(Duration::from_secs(1), consumer.next()).await;
2211            let is_err = res.is_err();
2212            if let Ok(val) = res {
2213                let msg = val.unwrap().unwrap();
2214                println!("got message: {:?}", msg.payload);
2215                // cleanup for the next test
2216                consumer.ack(&msg).await.unwrap();
2217                // we should not receive a different message anyway
2218                assert_eq!(message, msg.deserialize().unwrap());
2219            }
2220
2221            assert!(is_err, "waiting for a message should have timed out, since we already acknowledged the only message in the queue");
2222        }
2223    }
2224
2225    #[tokio::test]
2226    #[cfg(feature = "tokio-runtime")]
2227    async fn dead_letter_queue() {
2228        let _ = log::set_logger(&TEST_LOGGER);
2229        let _ = log::set_max_level(LevelFilter::Debug);
2230        let addr = "pulsar://127.0.0.1:6650";
2231
2232        let test_id: u16 = rand::random();
2233        let topic = format!("dead_letter_queue_test_{}", test_id);
2234        let test_msg: u32 = rand::random();
2235
2236        let message = TestData {
2237            topic: topic.clone(),
2238            msg: test_msg,
2239        };
2240
2241        let dead_letter_topic = format!("{}_dlq", topic);
2242
2243        let dead_letter_policy = crate::consumer::DeadLetterPolicy {
2244            max_redeliver_count: 1,
2245            dead_letter_topic: dead_letter_topic.clone(),
2246        };
2247
2248        let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2249
2250        println!("creating consumer");
2251        let mut consumer: Consumer<TestData, _> = client
2252            .consumer()
2253            .with_topic(topic.clone())
2254            .with_subscription("nack")
2255            .with_subscription_type(SubType::Shared)
2256            .with_dead_letter_policy(dead_letter_policy)
2257            .build()
2258            .await
2259            .unwrap();
2260
2261        println!("created consumer");
2262
2263        println!("creating second consumer that consumes from the DLQ");
2264        let mut dlq_consumer: Consumer<TestData, _> = client
2265            .clone()
2266            .consumer()
2267            .with_topic(dead_letter_topic)
2268            .with_subscription("dead_letter_topic")
2269            .with_subscription_type(SubType::Shared)
2270            .build()
2271            .await
2272            .unwrap();
2273
2274        println!("created second consumer");
2275
2276        client.send(&topic, &message).await.unwrap().await.unwrap();
2277        println!("producer sends done");
2278
2279        let msg = consumer.next().await.unwrap().unwrap();
2280        println!("got message: {:?}", msg.payload);
2281        assert_eq!(
2282            message,
2283            msg.deserialize().unwrap(),
2284            "we probably received a message from a previous run of the test"
2285        );
2286        // Nacking message to send it to DLQ
2287        consumer.nack(&msg).await.unwrap();
2288
2289        let dlq_msg = dlq_consumer.next().await.unwrap().unwrap();
2290        println!("got message: {:?}", msg.payload);
2291        assert_eq!(
2292            message,
2293            dlq_msg.deserialize().unwrap(),
2294            "we probably received a message from a previous run of the test"
2295        );
2296        dlq_consumer.ack(&dlq_msg).await.unwrap();
2297    }
2298
2299    #[tokio::test]
2300    #[cfg(feature = "tokio-runtime")]
2301    async fn failover() {
2302        let _ = log::set_logger(&MULTI_LOGGER);
2303        let _ = log::set_max_level(LevelFilter::Debug);
2304        let addr = "pulsar://127.0.0.1:6650";
2305        let topic = format!("failover_{}", rand::random::<u16>());
2306        let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2307
2308        let msg_count = 100_u32;
2309        try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string())))
2310            .await
2311            .unwrap();
2312
2313        let builder = client
2314            .consumer()
2315            .with_subscription("failover")
2316            .with_topic(&topic)
2317            .with_subscription_type(SubType::Failover)
2318            // get earliest messages
2319            .with_options(ConsumerOptions {
2320                initial_position: InitialPosition::Earliest,
2321                ..Default::default()
2322            });
2323
2324        let mut consumer_1: Consumer<String, _> = builder.clone().build().await.unwrap();
2325
2326        let mut consumer_2: Consumer<String, _> = builder.build().await.unwrap();
2327
2328        let mut consumed_1 = 0_u32;
2329        let mut consumed_2 = 0_u32;
2330        let mut pending_1 = Some(consumer_1.next());
2331        let mut pending_2 = Some(consumer_2.next());
2332        while consumed_1 + consumed_2 < msg_count {
2333            let next = select(pending_1.take().unwrap(), pending_2.take().unwrap());
2334            match timeout(Duration::from_secs(2), next).await.unwrap() {
2335                Either::Left((msg, pending)) => {
2336                    consumed_1 += 1;
2337                    let _ = consumer_1.ack(&msg.unwrap().unwrap());
2338                    pending_1 = Some(consumer_1.next());
2339                    pending_2 = Some(pending);
2340                }
2341                Either::Right((msg, pending)) => {
2342                    consumed_2 += 1;
2343                    let _ = consumer_2.ack(&msg.unwrap().unwrap());
2344                    pending_1 = Some(pending);
2345                    pending_2 = Some(consumer_2.next());
2346                }
2347            }
2348        }
2349        match (consumed_1, consumed_2) {
2350            (consumed_1, 0) => assert_eq!(consumed_1, msg_count),
2351            (0, consumed_2) => assert_eq!(consumed_2, msg_count),
2352            _ => panic!("Expected one consumer to consume all messages. Message count: {}, consumer_1: {} consumer_2: {}", msg_count, consumed_1, consumed_2),
2353        }
2354    }
2355
2356    #[tokio::test]
2357    #[cfg(feature = "tokio-runtime")]
2358    async fn seek_single_consumer() {
2359        let _ = log::set_logger(&MULTI_LOGGER);
2360        let _ = log::set_max_level(LevelFilter::Debug);
2361        log::info!("starting seek test");
2362        let addr = "pulsar://127.0.0.1:6650";
2363        let topic = format!("seek_{}", rand::random::<u16>());
2364        let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
2365
2366        // send 100 messages and record the starting time
2367        let msg_count = 100_u32;
2368
2369        let start_time: u64 = SystemTime::now()
2370            .duration_since(UNIX_EPOCH)
2371            .unwrap()
2372            .as_millis() as u64;
2373
2374        std::thread::sleep(Duration::from_secs(2));
2375
2376        println!("this is the starting time: {}", start_time);
2377
2378        try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string())))
2379            .await
2380            .unwrap();
2381        log::info!("sent all messages");
2382
2383        let mut consumer_1: Consumer<String, _> = client
2384            .consumer()
2385            .with_consumer_name("seek_single_test")
2386            .with_subscription("seek_single_test")
2387            .with_subscription_type(SubType::Shared)
2388            .with_topic(&topic)
2389            .build()
2390            .await
2391            .unwrap();
2392
2393        log::info!("built the consumer");
2394
2395        let mut consumed_1 = 0_u32;
2396        while let Some(msg) = consumer_1.try_next().await.unwrap() {
2397            consumer_1.ack(&msg).await.unwrap();
2398            let publish_time = msg.metadata().publish_time;
2399            let data = match msg.deserialize() {
2400                Ok(data) => data,
2401                Err(e) => {
2402                    log::error!("could not deserialize message: {:?}", e);
2403                    break;
2404                }
2405            };
2406
2407            consumed_1 += 1;
2408            log::info!(
2409                "first loop, got {} messages, content: {}, publish time: {}",
2410                consumed_1,
2411                data,
2412                publish_time
2413            );
2414
2415            //break after enough half of the messages were received
2416            if consumed_1 >= msg_count / 2 {
2417                log::info!("first loop, received {} messages, so break", consumed_1);
2418                break;
2419            }
2420        }
2421
2422        // // call seek(timestamp), roll back the consumer to start_time
2423        log::info!("calling seek method");
2424        let _seek_result = consumer_1
2425            .seek(None, None, Some(start_time), client)
2426            .await
2427            .unwrap();
2428
2429        // let mut consumer_2: Consumer<String, _> = client
2430        // .consumer()
2431        // .with_consumer_name("seek")
2432        // .with_subscription("seek")
2433        // .with_topic(&topic)
2434        // .build()
2435        // .await
2436        // .unwrap();
2437
2438        // then read the messages again
2439        let mut consumed_2 = 0_u32;
2440        log::info!("reading messages again");
2441        while let Some(msg) = consumer_1.try_next().await.unwrap() {
2442            let publish_time = msg.metadata().publish_time;
2443            consumer_1.ack(&msg).await.unwrap();
2444            let data = match msg.deserialize() {
2445                Ok(data) => data,
2446                Err(e) => {
2447                    log::error!("could not deserialize message: {:?}", e);
2448                    break;
2449                }
2450            };
2451            consumed_2 += 1;
2452            log::info!(
2453                "second loop, got {} messages, content: {},  publish time: {}",
2454                consumed_2,
2455                data,
2456                publish_time,
2457            );
2458
2459            if consumed_2 >= msg_count {
2460                log::info!("received {} messagses, so break", consumed_2);
2461                break;
2462            }
2463        }
2464
2465        //then check if all messages were received
2466        assert_eq!(50, consumed_1);
2467        assert_eq!(100, consumed_2);
2468    }
2469}