mod batched_message_iterator;
pub mod builder;
pub mod config;
pub mod data;
mod engine;
pub(crate) mod initial_position;
pub mod message;
mod multi;
pub mod options;
pub mod topic;
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
pin::Pin,
time::Duration,
};
pub use builder::ConsumerBuilder;
use chrono::{DateTime, Utc};
pub use data::{DeadLetterPolicy, EngineMessage};
use futures::{
future::try_join_all,
task::{Context, Poll},
Stream, StreamExt,
};
pub use initial_position::InitialPosition;
pub use message::Message;
use multi::MultiTopicConsumer;
pub use options::ConsumerOptions;
pub use topic::TopicConsumer;
use url::Url;
use crate::{
error::{ConsumerError, Error},
executor::Executor,
message::proto::{command_subscribe::SubType, MessageIdData, Schema},
proto::CommandConsumerStatsResponse,
DeserializeMessage, Pulsar,
};
enum InnerConsumer<T: DeserializeMessage, Exe: Executor> {
Single(TopicConsumer<T, Exe>),
Multi(MultiTopicConsumer<T, Exe>),
}
pub struct Consumer<T: DeserializeMessage, Exe: Executor> {
inner: InnerConsumer<T, Exe>,
}
impl<T: DeserializeMessage, Exe: Executor> Consumer<T, Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn builder(pulsar: &Pulsar<Exe>) -> ConsumerBuilder<Exe> {
ConsumerBuilder::new(pulsar)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn check_connection(&mut self) -> Result<(), Error> {
match &mut self.inner {
InnerConsumer::Single(c) => c.check_connection().await,
InnerConsumer::Multi(c) => c.check_connections().await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn get_stats(&mut self) -> Result<Vec<CommandConsumerStatsResponse>, Error> {
match &mut self.inner {
InnerConsumer::Single(c) => Ok(vec![c.get_stats().await?]),
InnerConsumer::Multi(c) => c.get_stats().await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
match &mut self.inner {
InnerConsumer::Single(c) => c.ack(msg).await,
InnerConsumer::Multi(c) => c.ack(msg).await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn ack_with_id(
&mut self,
topic: &str,
msg_id: MessageIdData,
) -> Result<(), ConsumerError> {
match &mut self.inner {
InnerConsumer::Single(c) => c.ack_with_id(msg_id).await,
InnerConsumer::Multi(c) => c.ack_with_id(topic, msg_id).await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn cumulative_ack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
match &mut self.inner {
InnerConsumer::Single(c) => c.cumulative_ack(msg).await,
InnerConsumer::Multi(c) => c.cumulative_ack(msg).await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn cumulative_ack_with_id(
&mut self,
topic: &str,
msg_id: MessageIdData,
) -> Result<(), ConsumerError> {
match &mut self.inner {
InnerConsumer::Single(c) => c.cumulative_ack_with_id(msg_id).await,
InnerConsumer::Multi(c) => c.cumulative_ack_with_id(topic, msg_id).await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn nack(&mut self, msg: &Message<T>) -> Result<(), ConsumerError> {
match &mut self.inner {
InnerConsumer::Single(c) => c.nack(msg).await,
InnerConsumer::Multi(c) => c.nack(msg).await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn nack_with_id(
&mut self,
topic: &str,
msg_id: MessageIdData,
) -> Result<(), ConsumerError> {
match &mut self.inner {
InnerConsumer::Single(c) => c.nack_with_id(msg_id).await,
InnerConsumer::Multi(c) => c.nack_with_id(topic, msg_id).await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn seek(
&mut self,
consumer_ids: Option<Vec<String>>,
message_id: Option<MessageIdData>,
timestamp: Option<u64>,
client: Pulsar<Exe>,
) -> Result<(), Error> {
let inner_consumer: InnerConsumer<T, Exe> = match &mut self.inner {
InnerConsumer::Single(c) => {
c.seek(message_id, timestamp).await?;
let topic = c.topic().to_string();
let addr = client.lookup_topic(&topic).await?;
let config = c.config().clone();
InnerConsumer::Single(TopicConsumer::new(client, topic, addr, config).await?)
}
InnerConsumer::Multi(c) => {
c.seek(consumer_ids, message_id, timestamp).await?;
let topics = c.topics();
let config = c.config().clone();
let addrs =
try_join_all(topics.into_iter().map(|topic| client.lookup_topic(topic)))
.await?;
let topic_addr_pair = c.topics.iter().cloned().zip(addrs.iter().cloned());
let consumers = try_join_all(topic_addr_pair.map(|(topic, addr)| {
TopicConsumer::new(client.clone(), topic, addr, config.clone())
}))
.await?;
let consumers: BTreeMap<_, _> = consumers
.into_iter()
.map(|c| (c.topic(), Box::pin(c)))
.collect();
let topics: VecDeque<String> = consumers.keys().cloned().collect();
let existing_topics = topics.clone();
let topic_refresh = Duration::from_secs(30);
let refresh = Box::pin(client.executor.interval(topic_refresh).map(drop));
let namespace = c.namespace.clone();
let config = c.config().clone();
let topic_regex = c.topic_regex.clone();
InnerConsumer::Multi(MultiTopicConsumer {
namespace,
topic_regex,
pulsar: client,
consumers,
topics,
existing_topics,
new_consumers: None,
refresh,
config,
disc_last_message_received: None,
disc_messages_received: 0,
})
}
};
self.inner = inner_consumer;
Ok(())
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn unsubscribe(&mut self) -> Result<(), Error> {
match &mut self.inner {
InnerConsumer::Single(c) => c.unsubscribe().await,
InnerConsumer::Multi(c) => c.unsubscribe().await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn close(&mut self) -> Result<(), Error> {
match &mut self.inner {
InnerConsumer::Single(c) => c.close().await,
InnerConsumer::Multi(c) => c.close().await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn get_last_message_id(&mut self) -> Result<Vec<MessageIdData>, Error> {
match &mut self.inner {
InnerConsumer::Single(c) => Ok(vec![c.get_last_message_id().await?]),
InnerConsumer::Multi(c) => c.get_last_message_id().await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn topics(&self) -> Vec<String> {
match &self.inner {
InnerConsumer::Single(c) => vec![c.topic()],
InnerConsumer::Multi(c) => c.existing_topics(),
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn connections(&mut self) -> Result<Vec<Url>, Error> {
match &mut self.inner {
InnerConsumer::Single(c) => Ok(vec![c.connection().await?.url().clone()]),
InnerConsumer::Multi(c) => {
let v = c
.consumers
.values_mut()
.map(|c| c.connection())
.collect::<Vec<_>>();
let mut connections = try_join_all(v).await?;
Ok(connections
.drain(..)
.map(|conn| conn.url().clone())
.collect::<BTreeSet<_>>()
.into_iter()
.collect())
}
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn options(&self) -> &ConsumerOptions {
match &self.inner {
InnerConsumer::Single(c) => &c.config.options,
InnerConsumer::Multi(c) => &c.config.options,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn dead_letter_policy(&self) -> Option<&DeadLetterPolicy> {
match &self.inner {
InnerConsumer::Single(c) => c.dead_letter_policy.as_ref(),
InnerConsumer::Multi(c) => c.config.dead_letter_policy.as_ref(),
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn subscription(&self) -> &str {
match &self.inner {
InnerConsumer::Single(c) => &c.config.subscription,
InnerConsumer::Multi(c) => &c.config.subscription,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn sub_type(&self) -> SubType {
match &self.inner {
InnerConsumer::Single(c) => c.config.sub_type,
InnerConsumer::Multi(c) => c.config.sub_type,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn batch_size(&self) -> Option<u32> {
match &self.inner {
InnerConsumer::Single(c) => c.config.batch_size,
InnerConsumer::Multi(c) => c.config.batch_size,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn consumer_name(&self) -> Option<&str> {
match &self.inner {
InnerConsumer::Single(c) => &c.config.consumer_name,
InnerConsumer::Multi(c) => &c.config.consumer_name,
}
.as_ref()
.map(|s| s.as_str())
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn consumer_id(&self) -> Vec<u64> {
match &self.inner {
InnerConsumer::Single(c) => vec![c.consumer_id],
InnerConsumer::Multi(c) => c.consumers.values().map(|c| c.consumer_id).collect(),
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn unacked_message_redelivery_delay(&self) -> Option<Duration> {
match &self.inner {
InnerConsumer::Single(c) => c.config.unacked_message_redelivery_delay,
InnerConsumer::Multi(c) => c.config.unacked_message_redelivery_delay,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn last_message_received(&self) -> Option<DateTime<Utc>> {
match &self.inner {
InnerConsumer::Single(c) => c.last_message_received(),
InnerConsumer::Multi(c) => c.last_message_received(),
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn messages_received(&self) -> u64 {
match &self.inner {
InnerConsumer::Single(c) => c.messages_received(),
InnerConsumer::Multi(c) => c.messages_received(),
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn get_schema(
&mut self,
topic: &str,
version: Option<Vec<u8>>,
) -> Result<Option<Schema>, Error> {
match &mut self.inner {
InnerConsumer::Single(c) => c.get_schema(version).await,
InnerConsumer::Multi(c) => c.get_schema(topic, version).await,
}
}
}
impl<T: DeserializeMessage + 'static, Exe: Executor> Stream for Consumer<T, Exe> {
type Item = Result<Message<T>, Error>;
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut self.inner {
InnerConsumer::Single(c) => Pin::new(c).poll_next(cx),
InnerConsumer::Multi(c) => Pin::new(c).poll_next(cx),
}
}
}
#[cfg(test)]
mod tests {
use std::{
collections::{HashMap, HashSet},
iter,
time::{SystemTime, UNIX_EPOCH},
};
use futures::{
future::{select, Either},
StreamExt,
};
use log::LevelFilter;
use regex::Regex;
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
use tokio::time::timeout;
use super::*;
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
use crate::executor::TokioExecutor;
use crate::{
consumer::initial_position::InitialPosition, producer, proto, tests::TEST_LOGGER,
Error as PulsarError, Payload, Pulsar, SerializeMessage,
};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct TestData {
topic: String,
msg: u32,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TestMessageMetadata {
properties: HashMap<String, String>,
partition_key: Option<String>,
ordering_key: Option<Vec<u8>>,
event_time: Option<u64>,
}
impl SerializeMessage for &TestData {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?;
Ok(producer::Message {
payload,
..Default::default()
})
}
}
impl DeserializeMessage for TestData {
type Output = Result<TestData, serde_json::Error>;
fn deserialize_message(payload: &Payload) -> Self::Output {
serde_json::from_slice(&payload.data)
}
}
const DEFAULT_RECV_TIMEOUT: Duration = Duration::from_secs(5);
async fn recv_within<T>(
c: &mut Consumer<T, TokioExecutor>,
dur: Duration,
) -> Result<Message<T>, String>
where
T: DeserializeMessage + 'static,
{
match tokio::time::timeout(dur, c.next()).await {
Err(_) => Err(format!("timed out waiting for next() after {:?}", dur)),
Ok(None) => Err("stream ended (None) while waiting for a message".into()),
Ok(Some(Err(e))) => Err(format!("consumer error: {e:?}")),
Ok(Some(Ok(m))) => Ok(m),
}
}
pub static MULTI_LOGGER: crate::tests::SimpleLogger = crate::tests::SimpleLogger {
tag: "multi_consumer",
};
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn multi_consumer() {
let _result = log::set_logger(&MULTI_LOGGER);
log::set_max_level(LevelFilter::Debug);
let addr = "pulsar://127.0.0.1:6650";
let topic_n: u16 = rand::random();
let topic1 = format!("multi_consumer_a_{topic_n}");
let topic2 = format!("multi_consumer_b_{topic_n}");
let data1 = TestData {
topic: "a".to_owned(),
msg: 1,
};
let data2 = TestData {
topic: "a".to_owned(),
msg: 2,
};
let data3 = TestData {
topic: "b".to_owned(),
msg: 3,
};
let data4 = TestData {
topic: "b".to_owned(),
msg: 4,
};
let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
try_join_all(vec![
client.send(&topic1, &data1),
client.send(&topic1, &data2),
client.send(&topic2, &data3),
client.send(&topic2, &data4),
])
.await
.unwrap();
let builder = client
.consumer()
.with_subscription_type(SubType::Shared)
.with_options(ConsumerOptions {
initial_position: InitialPosition::Earliest,
..Default::default()
});
let mut consumer_1: Consumer<TestData, _> = builder
.clone()
.with_subscription("consumer_1")
.with_consumer_name("consumer_1")
.with_topics([&topic1, &topic2])
.with_subscription_type(SubType::Exclusive)
.build()
.await
.unwrap();
assert!(&consumer_1.check_connection().await.is_ok());
let receive_queue_size = consumer_1.options().receiver_queue_size;
assert_eq!(receive_queue_size, None);
let mut consumer_2: Consumer<TestData, _> = builder
.clone()
.with_subscription("consumer_2")
.with_topic_regex(Regex::new(&format!("multi_consumer_[ab]_{topic_n}")).unwrap())
.build()
.await
.unwrap();
let expected: HashSet<_> = vec![data1, data2, data3, data4].into_iter().collect();
for consumer in [&mut consumer_1, &mut consumer_2].iter_mut() {
let connected_topics = consumer.topics();
debug!(
"connected topics for {}: {:?}",
consumer.subscription(),
&connected_topics
);
assert_eq!(connected_topics.len(), 2);
assert!(connected_topics.iter().any(|t| t.ends_with(&topic1)));
assert!(connected_topics.iter().any(|t| t.ends_with(&topic2)));
let mut received = HashSet::new();
while let Some(message) = timeout(Duration::from_secs(1), consumer.next())
.await
.unwrap()
{
let msg = message.unwrap();
received.insert(msg.deserialize().unwrap());
if received.len() == 2 {
consumer
.ack_with_id(msg.topic.as_str(), msg.message_id.id)
.await
.unwrap();
} else {
consumer.ack(&msg).await.unwrap();
}
if received.len() == 4 {
break;
}
}
assert_eq!(expected, received);
assert_eq!(consumer.messages_received(), 4);
assert!(consumer.last_message_received().is_some());
}
let stats = consumer_1.get_stats().await.unwrap();
assert_eq!(stats.len(), 2);
for stat in stats {
assert_eq!(stat.consumer_name().to_string(), "consumer_1");
}
let data5 = TestData {
topic: "c".to_owned(),
msg: 5,
};
let data6 = TestData {
topic: "c".to_owned(),
msg: 6,
};
try_join_all(vec![
client.send(&topic1, &data5),
client.send(&topic1, &data6),
])
.await
.unwrap();
let mut latest_msg = None;
for i in 0..2 {
let msg = recv_within(&mut consumer_1, DEFAULT_RECV_TIMEOUT).await;
println!("consumer_1 receive {}: {:?}", i, msg);
latest_msg = Some(msg.unwrap());
}
let r = consumer_1.cumulative_ack(&latest_msg.unwrap()).await;
assert!(r.is_ok());
consumer_1.close().await.unwrap();
let data6 = TestData {
topic: "d".to_owned(),
msg: 7,
};
client.send(&topic1, &data6).await.unwrap();
consumer_1 = builder
.clone()
.with_subscription("consumer_1")
.with_consumer_name("consumer_1")
.with_topics([&topic1])
.with_subscription_type(SubType::Shared)
.build::<TestData>()
.await
.unwrap();
let msg = recv_within(&mut consumer_1, DEFAULT_RECV_TIMEOUT)
.await
.unwrap();
assert_eq!(data6, msg.deserialize().unwrap());
for consumer in [&mut consumer_1, &mut consumer_2].iter_mut() {
consumer.unsubscribe().await.unwrap();
consumer.close().await.unwrap();
}
let consumer_1_exclusive = builder
.clone()
.with_subscription("consumer_1")
.with_consumer_name("consumer_1")
.with_topics([&topic1, &topic2])
.with_subscription_type(SubType::Exclusive)
.build::<TestData>()
.await;
assert!(consumer_1_exclusive.is_ok());
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn consumer_zero_receiver_queue_size() {
let _result = log::set_logger(&TEST_LOGGER);
log::set_max_level(LevelFilter::Debug);
let addr = "pulsar://127.0.0.1:6650";
let topic = format!(
"consumer_zero_receiver_queue_size_{}",
rand::random::<u16>()
);
let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
let consumer: Consumer<TestData, _> = client
.consumer()
.with_topic(&topic)
.with_subscription("dropped_ack")
.with_subscription_type(SubType::Shared)
.with_options(
ConsumerOptions::default()
.with_receiver_queue_size(0)
.with_initial_position(InitialPosition::Earliest),
)
.build()
.await
.unwrap();
let size = consumer.options().receiver_queue_size.unwrap();
assert_eq!(size, 1000);
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn consumer_dropped_with_lingering_acks() {
use rand::{distributions::Alphanumeric, Rng};
let _result = log::set_logger(&TEST_LOGGER);
log::set_max_level(LevelFilter::Debug);
let addr = "pulsar://127.0.0.1:6650";
let topic = format!(
"consumer_dropped_with_lingering_acks_{}",
rand::random::<u16>()
);
let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
let message = TestData {
topic: iter::repeat(())
.map(|()| rand::thread_rng().sample(Alphanumeric) as char)
.take(8)
.collect(),
msg: 1,
};
client.send(&topic, &message).await.unwrap().await.unwrap();
println!("producer sends done");
{
println!("creating consumer");
let mut consumer: Consumer<TestData, _> = client
.consumer()
.with_topic(&topic)
.with_subscription("dropped_ack")
.with_subscription_type(SubType::Shared)
.with_options(
ConsumerOptions::default()
.with_receiver_queue_size(2000)
.with_initial_position(InitialPosition::Earliest),
)
.build()
.await
.unwrap();
println!("created consumer");
let receive_queue_size = consumer.options().receiver_queue_size;
assert_eq!(receive_queue_size, Some(2000));
let msg: Message<TestData> = timeout(Duration::from_secs(1), consumer.next())
.await
.unwrap()
.unwrap()
.unwrap();
println!("got message: {:?}", msg.payload);
assert_eq!(
message,
msg.deserialize().unwrap(),
"we probably receive a message from a previous run of the test"
);
consumer.ack(&msg).await.unwrap();
}
{
println!("creating second consumer. The message should have been acked");
let mut consumer: Consumer<TestData, _> = client
.consumer()
.with_topic(&topic)
.with_consumer_name("dropped_ack")
.with_subscription("dropped_ack")
.with_subscription_type(SubType::Shared)
.with_options(ConsumerOptions {
initial_position: InitialPosition::Earliest,
..Default::default()
})
.build()
.await
.unwrap();
println!("created second consumer");
assert!(consumer.check_connection().await.is_ok());
let stats = consumer.get_stats().await.unwrap();
assert_eq!(stats.len(), 1);
for stat in stats {
assert_eq!(stat.consumer_name().to_string(), "dropped_ack");
}
let res: Result<_, tokio::time::error::Elapsed> =
tokio::time::timeout(Duration::from_secs(1), consumer.next()).await;
let is_err = res.is_err();
if let Ok(val) = res {
let msg = val.unwrap().unwrap();
println!("got message: {:?}", msg.payload);
consumer.ack(&msg).await.unwrap();
assert_eq!(message, msg.deserialize().unwrap());
}
assert!(
is_err,
"waiting for a message should have timed out, since we already acknowledged the \
only message in the queue"
);
consumer.unsubscribe().await.unwrap();
consumer.close().await.unwrap();
}
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn dead_letter_queue() {
let _result = log::set_logger(&TEST_LOGGER);
log::set_max_level(LevelFilter::Debug);
let addr = "pulsar://127.0.0.1:6650";
let test_id: u16 = rand::random();
let topic = format!("dead_letter_queue_test_{test_id}");
let test_msg: u32 = rand::random();
let message_data = TestData {
topic: topic.clone(),
msg: test_msg,
};
let message_metadata = TestMessageMetadata {
properties: HashMap::from([
("k_1".to_string(), "v_1".to_string()),
("k_2".to_string(), "v_2".to_string()),
]),
partition_key: Some("partition_key".to_string()),
ordering_key: Some(b"ordering_key".to_vec()),
event_time: Some(rand::random()),
};
let mut message = <&TestData>::serialize_message(&message_data).unwrap();
message.properties = message_metadata.properties.clone();
message.partition_key = message_metadata.partition_key.clone();
message.ordering_key = message_metadata.ordering_key.clone();
message.event_time = message_metadata.event_time;
let dead_letter_topic = format!("{topic}_dlq");
let dead_letter_policy = DeadLetterPolicy {
max_redeliver_count: 1,
dead_letter_topic: dead_letter_topic.clone(),
};
let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
println!("creating consumer");
let mut consumer: Consumer<TestData, _> = client
.consumer()
.with_topic(topic.clone())
.with_subscription("nack")
.with_subscription_type(SubType::Shared)
.with_dead_letter_policy(dead_letter_policy)
.build()
.await
.unwrap();
println!("created consumer");
println!("creating second consumer that consumes from the DLQ");
let mut dlq_consumer: Consumer<TestData, _> = client
.clone()
.consumer()
.with_topic(dead_letter_topic)
.with_subscription("dead_letter_topic")
.with_subscription_type(SubType::Shared)
.build()
.await
.unwrap();
println!("created second consumer");
client.send(&topic, message).await.unwrap().await.unwrap();
println!("producer sends done");
let msg = recv_within(&mut consumer, DEFAULT_RECV_TIMEOUT)
.await
.unwrap();
println!("got message: {:?}", msg.payload);
assert_eq!(
message_data,
msg.deserialize().unwrap(),
"we probably received a message from a previous run of the test"
);
consumer.nack(&msg).await.unwrap();
let dlq_msg = recv_within(&mut dlq_consumer, DEFAULT_RECV_TIMEOUT)
.await
.unwrap();
println!("got message: {:?}", dlq_msg.payload);
assert_eq!(
message_data,
dlq_msg.deserialize().unwrap(),
"we probably received a message from a previous run of the test"
);
let mut expected_properties = message_metadata.properties;
expected_properties
.entry("REAL_TOPIC".to_string())
.or_insert_with(|| topic.clone());
expected_properties
.entry("ORIGIN_MESSAGE_ID".to_string())
.or_insert_with(|| {
format!(
"{}:{}:{}",
msg.message_id().ledger_id,
msg.message_id().entry_id,
msg.message_id().partition.unwrap_or(-1)
)
});
assert_eq!(
expected_properties,
dlq_msg
.metadata()
.properties
.iter()
.map(|p| (p.key.clone(), p.value.clone()))
.collect::<HashMap<_, _>>(),
"message properties should be preserved when the message is sent to the DLQ"
);
assert_eq!(
message_metadata.partition_key,
dlq_msg.metadata().partition_key,
"message partition key should be preserved when the message is sent to the DLQ"
);
assert_eq!(
message_metadata.ordering_key,
dlq_msg.metadata().ordering_key,
"message ordering key should be preserved when the message is sent to the DLQ"
);
assert_eq!(
message_metadata.event_time,
dlq_msg.metadata().event_time,
"message event time should be preserved when the message is sent to the DLQ"
);
dlq_consumer.ack(&dlq_msg).await.unwrap();
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn dead_letter_queue_batched() {
use crate::ProducerOptions;
let _result = log::set_logger(&TEST_LOGGER);
log::set_max_level(LevelFilter::Debug);
let addr = "pulsar://127.0.0.1:6650";
let test_id: u16 = rand::random();
let topic = format!("dead_letter_queue_batched_test_{test_id}");
let dead_letter_topic = format!("{topic}_dlq");
let dead_letter_policy = DeadLetterPolicy {
max_redeliver_count: 1,
dead_letter_topic: dead_letter_topic.clone(),
};
let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
println!("creating consumer");
let mut consumer: Consumer<TestData, _> = client
.consumer()
.with_topic(topic.clone())
.with_subscription("nack")
.with_subscription_type(SubType::Shared)
.with_dead_letter_policy(dead_letter_policy)
.build()
.await
.unwrap();
println!("created consumer");
println!("creating second consumer that consumes from the DLQ");
let mut dlq_consumer: Consumer<TestData, _> = client
.clone()
.consumer()
.with_topic(dead_letter_topic)
.with_subscription("dead_letter_topic")
.with_subscription_type(SubType::Shared)
.build()
.await
.unwrap();
println!("created second consumer");
let mut producer = client
.producer()
.with_topic(&topic)
.with_options(ProducerOptions {
batch_size: Some(2),
..Default::default()
})
.build()
.await
.unwrap();
let message_datas = vec![
(
TestData {
topic: topic.clone(),
msg: rand::random(),
},
TestMessageMetadata {
properties: HashMap::from([
("k_1_1".to_string(), "v_1_1".to_string()),
("k_2_1".to_string(), "v_2_1".to_string()),
]),
partition_key: Some("partition_key_1".to_string()),
ordering_key: Some(b"ordering_key_1".to_vec()),
event_time: Some(rand::random()),
},
),
(
TestData {
topic: topic.clone(),
msg: rand::random(),
},
TestMessageMetadata {
properties: HashMap::from([
("k_1_2".to_string(), "v_1_2".to_string()),
("k_2_2".to_string(), "v_2_2".to_string()),
]),
partition_key: Some("partition_key_2".to_string()),
ordering_key: Some(b"ordering_key_2".to_vec()),
event_time: Some(rand::random()),
},
),
];
let messages = message_datas
.iter()
.map(|(data, metadata)| {
let mut message = <&TestData>::serialize_message(data).unwrap();
message.properties = metadata.properties.clone();
message.partition_key = metadata.partition_key.clone();
message.ordering_key = metadata.ordering_key.clone();
message.event_time = metadata.event_time;
message
})
.collect::<Vec<_>>();
let receipts = producer.send_all(messages).await.unwrap();
producer.send_batch().await.unwrap();
try_join_all(receipts).await.unwrap();
println!("producer sends done");
for (message_data, message_metadata) in message_datas {
let msg = recv_within(&mut consumer, DEFAULT_RECV_TIMEOUT)
.await
.unwrap();
println!("got message: {:?}", msg.payload);
assert_eq!(
message_data,
msg.deserialize().unwrap(),
"we probably received a message from a previous run of the test"
);
consumer.nack(&msg).await.unwrap();
let dlq_msg = recv_within(&mut dlq_consumer, DEFAULT_RECV_TIMEOUT)
.await
.unwrap();
println!("got message: {:?}", dlq_msg.payload);
assert_eq!(
message_data,
dlq_msg.deserialize().unwrap(),
"we probably received a message from a previous run of the test"
);
let mut expected_properties = message_metadata.properties;
expected_properties
.entry("REAL_TOPIC".to_string())
.or_insert_with(|| topic.clone());
expected_properties
.entry("ORIGIN_MESSAGE_ID".to_string())
.or_insert_with(|| {
format!(
"{}:{}:{}:{}",
msg.message_id().ledger_id,
msg.message_id().entry_id,
msg.message_id().partition.unwrap_or(-1),
msg.message_id().batch_index.unwrap_or(-1)
)
});
assert_eq!(
expected_properties,
dlq_msg
.metadata()
.properties
.iter()
.map(|p| (p.key.clone(), p.value.clone()))
.collect::<HashMap<_, _>>(),
"message properties should be preserved when the message is sent to the DLQ"
);
assert_eq!(
message_metadata.partition_key,
dlq_msg.metadata().partition_key,
"message partition key should be preserved when the message is sent to the DLQ"
);
assert_eq!(
message_metadata.ordering_key,
dlq_msg.metadata().ordering_key,
"message ordering key should be preserved when the message is sent to the DLQ"
);
assert_eq!(
message_metadata.event_time,
dlq_msg.metadata().event_time,
"message event time should be preserved when the message is sent to the DLQ"
);
dlq_consumer.ack(&dlq_msg).await.unwrap();
}
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn failover() {
let _result = log::set_logger(&MULTI_LOGGER);
log::set_max_level(LevelFilter::Debug);
let addr = "pulsar://127.0.0.1:6650";
let topic = format!("failover_{}", rand::random::<u16>());
let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
let msg_count = 100_u32;
try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string())))
.await
.unwrap();
let builder = client
.consumer()
.with_subscription("failover")
.with_topic(&topic)
.with_subscription_type(SubType::Failover)
.with_options(ConsumerOptions {
initial_position: InitialPosition::Earliest,
..Default::default()
});
let mut consumer_1: Consumer<String, _> = builder.clone().build().await.unwrap();
let mut consumer_2: Consumer<String, _> = builder.build().await.unwrap();
let mut consumed_1 = 0_u32;
let mut consumed_2 = 0_u32;
let mut pending_1 = Some(consumer_1.next());
let mut pending_2 = Some(consumer_2.next());
while consumed_1 + consumed_2 < msg_count {
let next = select(pending_1.take().unwrap(), pending_2.take().unwrap());
match timeout(Duration::from_secs(2), next).await.unwrap() {
Either::Left((msg, pending)) => {
consumed_1 += 1;
drop(consumer_1.ack(&msg.unwrap().unwrap()));
pending_1 = Some(consumer_1.next());
pending_2 = Some(pending);
}
Either::Right((msg, pending)) => {
consumed_2 += 1;
drop(consumer_2.ack(&msg.unwrap().unwrap()));
pending_1 = Some(pending);
pending_2 = Some(consumer_2.next());
}
}
}
match (consumed_1, consumed_2) {
(consumed_1, 0) => assert_eq!(consumed_1, msg_count),
(0, consumed_2) => assert_eq!(consumed_2, msg_count),
_ => panic!(
"Expected one consumer to consume all messages. Message count: {msg_count}, consumer_1: {consumed_1} \
consumer_2: {consumed_2}"
),
}
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn seek_single_consumer() {
let _result = log::set_logger(&MULTI_LOGGER);
log::set_max_level(LevelFilter::Debug);
log::info!("starting seek test");
let addr = "pulsar://127.0.0.1:6650";
let topic = format!("seek_{}", rand::random::<u16>());
let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
let msg_count = 100_u32;
let start_time: u64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
std::thread::sleep(Duration::from_secs(2));
println!("this is the starting time: {start_time}");
try_join_all((0..msg_count).map(|i| client.send(&topic, i.to_string())))
.await
.unwrap();
log::info!("sent all messages");
let mut consumer_1: Consumer<String, _> = client
.consumer()
.with_consumer_name("seek_single_test")
.with_subscription("seek_single_test")
.with_subscription_type(SubType::Shared)
.with_topic(&topic)
.with_options(ConsumerOptions {
initial_position: InitialPosition::Earliest,
..Default::default()
})
.build()
.await
.unwrap();
log::info!("built the consumer");
let mut consumed_1 = 0_u32;
while consumed_1 < msg_count / 2 {
let msg = recv_within(&mut consumer_1, DEFAULT_RECV_TIMEOUT)
.await
.expect("first loop failed to receive");
let publish_time = msg.metadata().publish_time;
let data = msg.deserialize().expect("deserialize");
log::info!(
"first loop, got {} messages, content: {}, publish time: {}",
consumed_1 + 1,
data,
publish_time
);
consumer_1.ack(&msg).await.unwrap();
consumed_1 += 1;
}
log::info!("first loop, received {} messages, so break", consumed_1);
log::info!("calling seek method");
let mut consumer_1 = tokio::task::spawn(async move {
consumer_1
.seek(None, None, Some(start_time), client)
.await
.unwrap();
consumer_1
})
.await
.unwrap();
log::info!("reading messages again");
let mut consumed_2 = 0_u32;
while consumed_2 < msg_count {
let msg = recv_within(&mut consumer_1, DEFAULT_RECV_TIMEOUT)
.await
.expect("second loop failed to receive after seek");
let publish_time = msg.metadata().publish_time;
let data = msg.deserialize().expect("deserialize");
log::info!(
"second loop, got {} messages, content: {}, publish time: {}",
consumed_2 + 1,
data,
publish_time
);
consumer_1.ack(&msg).await.unwrap();
consumed_2 += 1;
}
log::info!("received {} messages in second loop, so break", consumed_2);
assert_eq!(50, consumed_1);
assert_eq!(100, consumed_2);
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn schema_test() {
#[derive(Serialize, Deserialize)]
struct TestData {
age: i32,
name: String,
}
impl SerializeMessage for TestData {
fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
let payload =
serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
Ok(producer::Message {
payload,
..Default::default()
})
}
}
impl DeserializeMessage for TestData {
type Output = Result<TestData, serde_json::Error>;
fn deserialize_message(payload: &Payload) -> Self::Output {
serde_json::from_slice(&payload.data)
}
}
let _result = log::set_logger(&MULTI_LOGGER);
log::set_max_level(LevelFilter::Debug);
log::info!("starting schema test");
let addr = "pulsar://127.0.0.1:6650";
let topic = format!("schema_{}", rand::random::<u16>());
let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
let json_schema = serde_json::json!({
"type": "record",
"name": "TestRecord",
"namespace": "com.example",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
});
let schema_data = serde_json::to_vec(&json_schema).unwrap();
let mut producer = client
.producer()
.with_topic(&topic)
.with_name("schema producer")
.with_options(producer::ProducerOptions {
schema: Some(proto::Schema {
r#type: proto::schema::Type::Json as i32,
schema_data,
..Default::default()
}),
..Default::default()
})
.build()
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
let join_handle = tokio::spawn(async move {
let mut consumer: Consumer<TestData, _> = client
.consumer()
.with_consumer_name("seek_single_test")
.with_subscription("seek_single_test")
.with_subscription_type(SubType::Shared)
.with_topic(&topic)
.build()
.await
.unwrap();
log::info!("built the consumer");
tx.send(true).unwrap();
if let Ok(msg) = recv_within(&mut consumer, DEFAULT_RECV_TIMEOUT).await {
let schema_version = msg.payload.metadata.schema_version.clone();
let schema = consumer
.get_schema(&topic, schema_version)
.await
.unwrap()
.unwrap();
assert_eq!(schema.r#type, proto::schema::Type::Json as i32);
let schema_resolved: serde_json::Value =
serde_json::from_slice(&schema.schema_data).unwrap();
assert_eq!(json_schema, schema_resolved);
consumer.ack(&msg).await.unwrap();
} else {
panic!("timed out waiting for schema_test message");
}
});
let consumer_created = rx.await.unwrap();
assert!(consumer_created);
#[allow(deprecated)]
producer
.send(TestData {
age: 30,
name: "test".to_string(),
})
.await
.unwrap()
.await
.unwrap();
join_handle.await.unwrap();
}
#[derive(Serialize, Deserialize)]
struct SchemaTestData {
age: i32,
name: String,
}
impl SerializeMessage for SchemaTestData {
fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
let payload =
serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
Ok(producer::Message {
payload,
..Default::default()
})
}
}
impl DeserializeMessage for SchemaTestData {
type Output = Result<SchemaTestData, serde_json::Error>;
fn deserialize_message(payload: &Payload) -> Self::Output {
serde_json::from_slice(&payload.data)
}
}
fn test_json_schema(name: &str) -> Vec<u8> {
let json_schema = serde_json::json!({
"type": "record",
"name": name,
"namespace": "com.example",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "age", "type": "int" }
]
});
serde_json::to_vec(&json_schema).unwrap()
}
fn init_logger() {
let _result = log::set_logger(&MULTI_LOGGER);
log::set_max_level(LevelFilter::Debug);
}
async fn new_client() -> Pulsar<TokioExecutor> {
let addr = "pulsar://127.0.0.1:6650";
Pulsar::builder(addr, TokioExecutor).build().await.unwrap()
}
async fn schema_producer(
client: &Pulsar<TokioExecutor>,
topic: &str,
name: &str,
schema_name: &str,
extra: impl FnOnce(&mut producer::ProducerOptions),
) -> producer::Producer<TokioExecutor> {
let mut opts = producer::ProducerOptions {
schema: Some(proto::Schema {
r#type: proto::schema::Type::Json as i32,
schema_data: test_json_schema(schema_name),
..Default::default()
}),
..Default::default()
};
extra(&mut opts);
client
.producer()
.with_topic(topic)
.with_name(name)
.with_options(opts)
.build()
.await
.unwrap()
}
async fn earliest_consumer<T: DeserializeMessage>(
client: &Pulsar<TokioExecutor>,
topic: &str,
name: &str,
sub: &str,
) -> Consumer<T, TokioExecutor> {
client
.consumer()
.with_consumer_name(name)
.with_subscription(sub)
.with_subscription_type(SubType::Exclusive)
.with_topic(topic)
.with_options(ConsumerOptions {
initial_position: InitialPosition::Earliest,
..Default::default()
})
.build()
.await
.unwrap()
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn schema_version_set_on_produced_messages() {
init_logger();
let topic = format!("schema_version_test_{}", rand::random::<u16>());
let client = new_client().await;
let mut producer = schema_producer(
&client,
&topic,
"schema_version_producer",
"SchemaVersionTest",
|_| {},
)
.await;
let mut consumer: Consumer<SchemaTestData, _> = earliest_consumer(
&client,
&topic,
"schema_version_consumer",
"schema_version_sub",
)
.await;
#[allow(deprecated)]
producer
.send(SchemaTestData {
age: 30,
name: "test".to_string(),
})
.await
.unwrap()
.await
.unwrap();
let msg = recv_within(&mut consumer, DEFAULT_RECV_TIMEOUT)
.await
.unwrap();
assert!(
msg.payload
.metadata
.schema_version
.as_ref()
.is_some_and(|sv| !sv.is_empty()),
"schema_version should be a non-empty byte sequence, got {:?}",
msg.payload.metadata.schema_version
);
consumer.ack(&msg).await.unwrap();
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn schema_version_set_on_batched_messages() {
init_logger();
let topic = format!("schema_version_batch_test_{}", rand::random::<u16>());
let client = new_client().await;
let mut producer = schema_producer(
&client,
&topic,
"schema_version_batch_producer",
"SchemaVersionBatchTest",
|opts| {
opts.batch_size = Some(2);
},
)
.await;
let mut consumer: Consumer<SchemaTestData, _> = earliest_consumer(
&client,
&topic,
"schema_version_batch_consumer",
"schema_version_batch_sub",
)
.await;
let receipt1 = producer
.send_non_blocking(SchemaTestData {
age: 25,
name: "batch1".to_string(),
})
.await
.unwrap();
let receipt2 = producer
.send_non_blocking(SchemaTestData {
age: 35,
name: "batch2".to_string(),
})
.await
.unwrap();
futures::try_join!(receipt1, receipt2).unwrap();
let mut schema_versions = Vec::new();
for _ in 0..2 {
let msg = recv_within(&mut consumer, DEFAULT_RECV_TIMEOUT)
.await
.unwrap();
let sv = msg.payload.metadata.schema_version.clone();
assert!(
sv.is_some(),
"schema_version should be set on batched messages produced with a schema"
);
schema_versions.push(sv.unwrap());
consumer.ack(&msg).await.unwrap();
}
assert_eq!(
schema_versions[0], schema_versions[1],
"both messages in the same batch should carry the same schema_version"
);
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn user_provided_schema_version_not_overwritten() {
#[derive(Serialize, Deserialize)]
struct DataWithCustomSchemaVersion {
value: String,
}
const CUSTOM_SCHEMA_VERSION: &[u8] = &[42, 42, 42, 42];
impl SerializeMessage for DataWithCustomSchemaVersion {
fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
let payload =
serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
Ok(producer::Message {
payload,
schema_version: Some(CUSTOM_SCHEMA_VERSION.to_vec()),
..Default::default()
})
}
}
impl DeserializeMessage for DataWithCustomSchemaVersion {
type Output = Result<DataWithCustomSchemaVersion, serde_json::Error>;
fn deserialize_message(payload: &Payload) -> Self::Output {
serde_json::from_slice(&payload.data)
}
}
init_logger();
let topic = format!("schema_version_override_test_{}", rand::random::<u16>());
let client = new_client().await;
let mut producer = schema_producer(
&client,
&topic,
"schema_version_override_producer",
"SchemaVersionOverrideTest",
|_| {},
)
.await;
let mut consumer: Consumer<DataWithCustomSchemaVersion, _> = earliest_consumer(
&client,
&topic,
"schema_version_override_consumer",
"schema_version_override_sub",
)
.await;
#[allow(deprecated)]
producer
.send(DataWithCustomSchemaVersion {
value: "override_test".to_string(),
})
.await
.unwrap()
.await
.unwrap();
let msg = recv_within(&mut consumer, DEFAULT_RECV_TIMEOUT)
.await
.unwrap();
assert_eq!(
msg.payload.metadata.schema_version.as_deref(),
Some(CUSTOM_SCHEMA_VERSION),
"user-provided schema_version should not be overwritten by the broker-assigned version"
);
consumer.ack(&msg).await.unwrap();
}
#[tokio::test]
#[cfg(any(
feature = "tokio-runtime",
feature = "tokio-rustls-runtime-aws-lc-rs",
feature = "tokio-rustls-runtime-ring"
))]
async fn schemaless_producer_has_no_schema_version() {
init_logger();
let topic = format!("schemaless_sv_test_{}", rand::random::<u16>());
let client = new_client().await;
let mut producer = client
.producer()
.with_topic(&topic)
.with_name("schemaless_sv_producer")
.build()
.await
.unwrap();
let mut consumer: Consumer<TestData, _> = earliest_consumer(
&client,
&topic,
"schemaless_sv_consumer",
"schemaless_sv_sub",
)
.await;
#[allow(deprecated)]
producer
.send(&TestData {
topic: "schemaless".to_string(),
msg: 1,
})
.await
.unwrap()
.await
.unwrap();
let msg = recv_within(&mut consumer, DEFAULT_RECV_TIMEOUT)
.await
.unwrap();
let sv = msg
.payload
.metadata
.schema_version
.as_deref()
.unwrap_or(&[]);
assert!(
sv.is_empty(),
"schemaless producer should not carry a non-empty schema_version, got {sv:?}"
);
consumer.ack(&msg).await.unwrap();
}
}