use std::{
collections::{BTreeMap, HashMap, VecDeque},
io::Write,
pin::Pin,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use futures::{
channel::oneshot,
future::try_join_all,
lock::Mutex,
task::{Context, Poll},
Future,
};
use crate::{
client::SerializeMessage,
compression::Compression,
connection::{Connection, SerialId},
error::{ConnectionError, ProducerError},
executor::Executor,
message::{
proto::{self, CommandSendReceipt, EncryptionKeys, Schema},
BatchedMessage,
},
Error, Pulsar,
};
type ProducerId = u64;
type ProducerName = String;
pub struct SendFuture(pub(crate) oneshot::Receiver<Result<CommandSendReceipt, Error>>);
impl Future for SendFuture {
type Output = Result<CommandSendReceipt, Error>;
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(Ok(r)) => Poll::Ready(r),
Poll::Ready(Err(_)) => Poll::Ready(Err(ProducerError::Custom(
"producer unexpectedly disconnected".into(),
)
.into())),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct Message {
pub payload: Vec<u8>,
pub properties: HashMap<String, String>,
pub partition_key: ::std::option::Option<String>,
pub ordering_key: ::std::option::Option<Vec<u8>>,
pub replicate_to: ::std::vec::Vec<String>,
pub event_time: ::std::option::Option<u64>,
pub schema_version: ::std::option::Option<Vec<u8>>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct ProducerMessage {
pub payload: Vec<u8>,
pub properties: HashMap<String, String>,
pub partition_key: ::std::option::Option<String>,
pub ordering_key: ::std::option::Option<Vec<u8>>,
pub replicate_to: ::std::vec::Vec<String>,
pub compression: ::std::option::Option<i32>,
pub uncompressed_size: ::std::option::Option<u32>,
pub num_messages_in_batch: ::std::option::Option<i32>,
pub event_time: ::std::option::Option<u64>,
pub encryption_keys: ::std::vec::Vec<EncryptionKeys>,
pub encryption_algo: ::std::option::Option<String>,
pub encryption_param: ::std::option::Option<Vec<u8>>,
pub schema_version: ::std::option::Option<Vec<u8>>,
pub deliver_at_time: ::std::option::Option<i64>,
}
impl From<Message> for ProducerMessage {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn from(m: Message) -> Self {
ProducerMessage {
payload: m.payload,
properties: m.properties,
partition_key: m.partition_key,
ordering_key: m.ordering_key,
replicate_to: m.replicate_to,
event_time: m.event_time,
schema_version: m.schema_version,
..Default::default()
}
}
}
#[derive(Clone, Default)]
pub struct ProducerOptions {
pub encrypted: Option<bool>,
pub metadata: BTreeMap<String, String>,
pub schema: Option<Schema>,
pub batch_size: Option<u32>,
pub compression: Option<Compression>,
pub access_mode: Option<i32>,
}
pub struct MultiTopicProducer<Exe: Executor> {
client: Pulsar<Exe>,
producers: BTreeMap<String, Producer<Exe>>,
options: ProducerOptions,
name: Option<String>,
}
impl<Exe: Executor> MultiTopicProducer<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn options(&self) -> &ProducerOptions {
&self.options
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn topics(&self) -> Vec<String> {
self.producers.keys().cloned().collect()
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn close_producer<S: Into<String>>(&mut self, topic: S) -> Result<(), Error> {
let partitions = self.client.lookup_partitioned_topic(topic).await?;
for (topic, _) in partitions {
self.producers.remove(&topic);
}
Ok(())
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send<T: SerializeMessage + Sized, S: Into<String>>(
&mut self,
topic: S,
message: T,
) -> Result<SendFuture, Error> {
let message = T::serialize_message(message)?;
let topic = topic.into();
if !self.producers.contains_key(&topic) {
let mut builder = self
.client
.producer()
.with_topic(&topic)
.with_options(self.options.clone());
if let Some(name) = &self.name {
builder = builder.with_name(name.clone());
}
let producer = builder.build().await?;
self.producers.insert(topic.clone(), producer);
}
let producer = self.producers.get_mut(&topic).unwrap();
producer.send(message).await
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_all<'a, 'b, T, S, I>(
&mut self,
topic: S,
messages: I,
) -> Result<Vec<SendFuture>, Error>
where
'b: 'a,
T: 'b + SerializeMessage + Sized,
I: IntoIterator<Item = T>,
S: Into<String>,
{
let topic = topic.into();
let mut sends = Vec::new();
for msg in messages {
sends.push(self.send(&topic, msg).await);
}
if sends.iter().all(|s| s.is_ok()) {
Ok(sends.into_iter().map(|s| s.unwrap()).collect())
} else {
Err(ProducerError::PartialSend(sends).into())
}
}
}
pub struct Producer<Exe: Executor> {
inner: ProducerInner<Exe>,
}
impl<Exe: Executor> Producer<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn builder(pulsar: &Pulsar<Exe>) -> ProducerBuilder<Exe> {
ProducerBuilder::new(pulsar)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn topic(&self) -> &str {
match &self.inner {
ProducerInner::Single(p) => p.topic(),
ProducerInner::Partitioned(p) => &p.topic,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn partitions(&self) -> Option<Vec<String>> {
match &self.inner {
ProducerInner::Single(_) => None,
ProducerInner::Partitioned(p) => {
Some(p.producers.iter().map(|p| p.topic().to_owned()).collect())
}
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn options(&self) -> &ProducerOptions {
match &self.inner {
ProducerInner::Single(p) => p.options(),
ProducerInner::Partitioned(p) => &p.options,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn create_message(&mut self) -> MessageBuilder<(), Exe> {
MessageBuilder::new(self)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn check_connection(&self) -> Result<(), Error> {
match &self.inner {
ProducerInner::Single(p) => p.check_connection().await,
ProducerInner::Partitioned(p) => {
try_join_all(p.producers.iter().map(|p| p.check_connection()))
.await
.map(drop)
}
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send<T: SerializeMessage + Sized>(
&mut self,
message: T,
) -> Result<SendFuture, Error> {
match &mut self.inner {
ProducerInner::Single(p) => p.send(message).await,
ProducerInner::Partitioned(p) => p.next().send(message).await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_all<T, I>(&mut self, messages: I) -> Result<Vec<SendFuture>, Error>
where
T: SerializeMessage,
I: IntoIterator<Item = T>,
{
let producer = match &mut self.inner {
ProducerInner::Single(p) => p,
ProducerInner::Partitioned(p) => p.next(),
};
let mut sends = Vec::new();
for message in messages {
sends.push(producer.send(message).await);
}
if sends.iter().all(|s| s.is_ok()) {
Ok(sends.into_iter().map(|s| s.unwrap()).collect())
} else {
Err(ProducerError::PartialSend(sends).into())
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_batch(&mut self) -> Result<(), Error> {
match &mut self.inner {
ProducerInner::Single(p) => p.send_batch().await,
ProducerInner::Partitioned(p) => {
try_join_all(p.producers.iter_mut().map(|p| p.send_batch()))
.await
.map(drop)
}
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub(crate) async fn send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error> {
match &mut self.inner {
ProducerInner::Single(p) => p.send_raw(message).await,
ProducerInner::Partitioned(p) => p.next().send_raw(message).await,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn close(&mut self) -> Result<(), Error> {
match &mut self.inner {
ProducerInner::Single(producer) => producer.close().await,
ProducerInner::Partitioned(p) => try_join_all(p.producers.iter().map(|p| p.close()))
.await
.map(drop),
}
}
}
enum ProducerInner<Exe: Executor> {
Single(TopicProducer<Exe>),
Partitioned(PartitionedProducer<Exe>),
}
struct PartitionedProducer<Exe: Executor> {
producers: VecDeque<TopicProducer<Exe>>,
topic: String,
options: ProducerOptions,
}
impl<Exe: Executor> PartitionedProducer<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn next(&mut self) -> &mut TopicProducer<Exe> {
self.producers.rotate_left(1);
self.producers.front_mut().unwrap()
}
}
struct TopicProducer<Exe: Executor> {
client: Pulsar<Exe>,
connection: Arc<Connection<Exe>>,
id: ProducerId,
name: ProducerName,
topic: String,
message_id: SerialId,
batch: Option<Mutex<Batch>>,
compression: Option<Compression>,
drop_signal: oneshot::Sender<()>,
options: ProducerOptions,
}
impl<Exe: Executor> TopicProducer<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub(crate) async fn from_connection<S: Into<String>>(
client: Pulsar<Exe>,
mut connection: Arc<Connection<Exe>>,
topic: S,
name: Option<String>,
options: ProducerOptions,
) -> Result<Self, Error> {
let topic = topic.into();
let producer_id = rand::random();
let sequence_ids = SerialId::new();
let topic = topic.clone();
let batch_size = options.batch_size;
let compression = options.compression.clone();
let producer_name: ProducerName;
let mut current_retries = 0u32;
let start = std::time::Instant::now();
let operation_retry_options = client.operation_retry_options.clone();
loop {
let connection_sender = connection.sender();
match connection_sender
.create_producer(topic.clone(), producer_id, name.clone(), options.clone())
.await
.map_err(|e| {
error!("TopicProducer::from_connection error[{}]: {:?}", line!(), e);
e
}) {
Ok(partial_success) => {
if let Some(producer_ready) = partial_success.producer_ready {
if !producer_ready {
trace!("producer is still waiting for exclusive access");
let result = connection_sender
.wait_for_exclusive_access(partial_success.request_id)
.await;
trace!("result is received: {:?}", result);
}
}
producer_name = partial_success.producer_name;
if current_retries > 0 {
let dur = (std::time::Instant::now() - start).as_secs();
log::info!(
"producer({}) success after {} retries over {} seconds",
topic,
current_retries + 1,
dur
);
}
break;
}
Err(ConnectionError::PulsarError(
Some(proto::ServerError::ServiceNotReady),
text,
)) => {
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
error!("create_producer({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries, text.unwrap_or_default());
current_retries += 1;
client
.executor
.delay(operation_retry_options.retry_delay)
.await;
let addr = client.lookup_topic(&topic).await?;
connection = client.manager.get_connection(&addr).await?;
continue;
} else {
error!("create_producer({}) reached max retries", topic);
return Err(ConnectionError::PulsarError(
Some(proto::ServerError::ServiceNotReady),
text,
)
.into());
}
}
Err(ConnectionError::PulsarError(Some(proto::ServerError::ProducerBusy), text)) => {
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
error!("create_producer({}) answered ProducerBusy, retrying request after {}ms (max_retries = {:?}): {}",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries, text.unwrap_or_default());
current_retries += 1;
client
.executor
.delay(operation_retry_options.retry_delay)
.await;
let addr = client.lookup_topic(&topic).await?;
connection = client.manager.get_connection(&addr).await?;
continue;
} else {
error!("create_producer({}) reached max retries", topic);
return Err(ConnectionError::PulsarError(
Some(proto::ServerError::ProducerBusy),
text,
)
.into());
}
}
Err(ConnectionError::Io(e)) => {
if e.kind() != std::io::ErrorKind::TimedOut {
warn!("send_inner got io error: {:?}", e);
return Err(ProducerError::Connection(ConnectionError::Io(e)).into());
} else if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
error!(
"create_producer({}) TimedOut, retrying request after {}ms (max_retries = {:?})",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries
);
current_retries += 1;
client
.executor
.delay(operation_retry_options.retry_delay)
.await;
let addr = client.lookup_topic(&topic).await?;
connection = client.manager.get_connection(&addr).await?;
continue;
} else {
error!("create_producer({}) reached max retries", topic);
return Err(ProducerError::Connection(ConnectionError::Io(e)).into());
}
}
Err(e) => return Err(Error::Connection(e)),
}
}
let (_drop_signal, drop_receiver) = oneshot::channel::<()>();
let conn = connection.clone();
let _ = client.executor.spawn(Box::pin(async move {
let _res = drop_receiver.await;
let _ = conn.sender().close_producer(producer_id).await;
}));
Ok(TopicProducer {
client,
connection,
id: producer_id,
name: producer_name,
topic,
message_id: sequence_ids,
batch: batch_size.map(Batch::new).map(Mutex::new),
compression,
drop_signal: _drop_signal,
options,
})
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn topic(&self) -> &str {
&self.topic
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn options(&self) -> &ProducerOptions {
&self.options
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn check_connection(&self) -> Result<(), Error> {
self.connection.sender().send_ping().await?;
Ok(())
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn send<T: SerializeMessage + Sized>(&mut self, message: T) -> Result<SendFuture, Error> {
match T::serialize_message(message) {
Ok(message) => self.send_raw(message.into()).await,
Err(e) => Err(e),
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn send_batch(&mut self) -> Result<(), Error> {
match self.batch.as_ref() {
None => Err(ProducerError::Custom("not a batching producer".to_string()).into()),
Some(batch) => {
let mut payload: Vec<u8> = Vec::new();
let mut receipts = Vec::new();
let message_count;
{
let batch = batch.lock().await;
let messages = batch.get_messages().await;
message_count = messages.len();
for (tx, message) in messages {
receipts.push(tx);
message.serialize(&mut payload);
}
}
if message_count == 0 {
return Ok(());
}
let message = ProducerMessage {
payload,
num_messages_in_batch: Some(message_count as i32),
..Default::default()
};
trace!("sending a batched message of size {}", message_count);
let send_receipt = self.send_compress(message).await.map_err(Arc::new);
for resolver in receipts {
let _ = resolver.send(
send_receipt
.clone()
.map_err(|e| ProducerError::Batch(e).into()),
);
}
Ok(())
}
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub(crate) async fn send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error> {
let (tx, rx) = oneshot::channel();
match self.batch.as_ref() {
None => {
let receipt = self.send_compress(message).await?;
let _ = tx.send(Ok(receipt));
Ok(SendFuture(rx))
}
Some(batch) => {
let mut payload: Vec<u8> = Vec::new();
let mut receipts = Vec::new();
let mut counter = 0i32;
{
let batch = batch.lock().await;
batch.push_back((tx, message)).await;
if batch.is_full().await {
for (tx, message) in batch.get_messages().await {
receipts.push(tx);
message.serialize(&mut payload);
counter += 1;
}
}
}
if counter > 0 {
let message = ProducerMessage {
payload,
num_messages_in_batch: Some(counter),
..Default::default()
};
let send_receipt = self.send_compress(message).await.map_err(Arc::new);
trace!("sending a batched message of size {}", counter);
for tx in receipts.drain(..) {
let _ = tx.send(
send_receipt
.clone()
.map_err(|e| ProducerError::Batch(e).into()),
);
}
}
Ok(SendFuture(rx))
}
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn send_compress(
&mut self,
mut message: ProducerMessage,
) -> Result<proto::CommandSendReceipt, Error> {
let compressed_message = match self.compression.clone() {
None | Some(Compression::None) => message,
#[cfg(feature = "lz4")]
Some(Compression::Lz4(compression)) => {
let compressed_payload: Vec<u8> =
lz4::block::compress(&message.payload[..], Some(compression.mode), false)
.map_err(ProducerError::Io)?;
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(proto::CompressionType::Lz4.into());
message
}
#[cfg(feature = "flate2")]
Some(Compression::Zlib(compression)) => {
let mut e = flate2::write::ZlibEncoder::new(Vec::new(), compression.level);
e.write_all(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = e.finish().map_err(ProducerError::Io)?;
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(proto::CompressionType::Zlib.into());
message
}
#[cfg(feature = "zstd")]
Some(Compression::Zstd(compression)) => {
let compressed_payload = zstd::encode_all(&message.payload[..], compression.level)
.map_err(ProducerError::Io)?;
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(proto::CompressionType::Zstd.into());
message
}
#[cfg(feature = "snap")]
Some(Compression::Snappy(..)) => {
let compressed_payload: Vec<u8> = Vec::new();
let mut encoder = snap::write::FrameEncoder::new(compressed_payload);
encoder
.write(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = encoder
.into_inner()
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Snappy compression error: {:?}", e),
)
})
.map_err(ProducerError::Io)?;
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(proto::CompressionType::Snappy.into());
message
}
};
self.send_inner(compressed_message).await
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn send_inner(
&mut self,
message: ProducerMessage,
) -> Result<proto::CommandSendReceipt, Error> {
loop {
let msg = message.clone();
match self
.connection
.sender()
.send(self.id, self.name.clone(), self.message_id.get(), msg)
.await
{
Ok(receipt) => return Ok(receipt),
Err(ConnectionError::Disconnected) => {}
Err(ConnectionError::Io(e)) => {
if e.kind() != std::io::ErrorKind::TimedOut {
error!("send_inner got io error: {:?}", e);
return Err(ProducerError::Connection(ConnectionError::Io(e)).into());
}
}
Err(e) => {
error!("send_inner got error: {:?}", e);
return Err(ProducerError::Connection(e).into());
}
};
error!(
"send_inner: connection {} disconnected",
self.connection.id()
);
self.reconnect().await?;
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn reconnect(&mut self) -> Result<(), Error> {
debug!("reconnecting producer for topic: {}", self.topic);
let (drop_signal, _) = oneshot::channel::<()>();
let old_signal = std::mem::replace(&mut self.drop_signal, drop_signal);
let _ = old_signal.send(());
let broker_address = self.client.lookup_topic(&self.topic).await?;
let conn = self.client.manager.get_connection(&broker_address).await?;
self.connection = conn;
warn!(
"Retry #0 -> reconnecting producer {:#} using connection {:#} to broker {:#} to topic {:#}",
self.id,
self.connection.id(),
broker_address.url,
self.topic
);
let topic = self.topic.clone();
let batch_size = self.options.batch_size;
let mut current_retries = 0u32;
let start = std::time::Instant::now();
let operation_retry_options = self.client.operation_retry_options.clone();
loop {
match self
.connection
.sender()
.create_producer(
topic.clone(),
self.id,
Some(self.name.clone()),
self.options.clone(),
)
.await
.map_err(|e| {
error!("TopicProducer::create_producer error[{}]: {:?}", line!(), e);
e
}) {
Ok(_success) => {
if current_retries > 0 {
let dur = (std::time::Instant::now() - start).as_secs();
log::info!(
"producer({}) success after {} retries over {} seconds",
topic,
current_retries + 1,
dur
);
}
break;
}
Err(ConnectionError::PulsarError(
Some(proto::ServerError::ServiceNotReady),
text,
)) => {
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
warn!("create_producer({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries, text.unwrap_or_default());
current_retries += 1;
self.client
.executor
.delay(operation_retry_options.retry_delay)
.await;
let addr = self.client.lookup_topic(&topic).await?;
self.connection = self.client.manager.get_connection(&addr).await?;
warn!(
"Retry #{} -> reconnecting producer {:#} using connection {:#} to broker {:#} to topic {:#}",
current_retries,
self.id,
self.connection.id(),
broker_address.url,
self.topic
);
continue;
} else {
error!("create_producer({}) reached max retries", topic);
return Err(ConnectionError::PulsarError(
Some(proto::ServerError::ServiceNotReady),
text,
)
.into());
}
}
Err(ConnectionError::PulsarError(Some(proto::ServerError::ProducerBusy), text)) => {
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
warn!("create_producer({}) answered ProducerBusy, retrying request after {}ms (max_retries = {:?}): {}",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries, text.unwrap_or_default());
current_retries += 1;
self.client
.executor
.delay(operation_retry_options.retry_delay)
.await;
let addr = self.client.lookup_topic(&topic).await?;
self.connection = self.client.manager.get_connection(&addr).await?;
warn!(
"Retry #{} -> reconnecting producer {:#} using connection {:#} to broker {:#} to topic {:#}",
current_retries,
self.id,
self.connection.id(),
broker_address.url,
self.topic
);
continue;
} else {
error!("create_producer({}) reached max retries", topic);
return Err(ConnectionError::PulsarError(
Some(proto::ServerError::ProducerBusy),
text,
)
.into());
}
}
Err(ConnectionError::Io(e)) => {
if e.kind() != std::io::ErrorKind::TimedOut {
error!("send_inner got io error: {:?}", e);
return Err(ProducerError::Connection(ConnectionError::Io(e)).into());
} else if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
warn!("create_producer({}) TimedOut, retrying request after {}ms (max_retries = {:?})",
topic, operation_retry_options.retry_delay.as_millis(), operation_retry_options.max_retries);
current_retries += 1;
self.client
.executor
.delay(operation_retry_options.retry_delay)
.await;
let addr = self.client.lookup_topic(&topic).await?;
self.connection = self.client.manager.get_connection(&addr).await?;
warn!(
"Retry #{} -> reconnecting producer {:#} using connection {:#} to broker {:#} to topic {:#}",
current_retries,
self.id,
self.connection.id(),
broker_address.url,
self.topic
);
continue;
} else {
error!("create_producer({}) reached max retries", topic);
return Err(Error::Connection(ConnectionError::Io(e)));
}
}
Err(e) => {
error!("reconnect error[{:?}]: {:?}", line!(), e);
return Err(Error::Connection(e));
}
}
}
let (_drop_signal, drop_receiver) = oneshot::channel::<()>();
let batch = batch_size.map(Batch::new).map(Mutex::new);
let conn = Arc::downgrade(&self.connection);
let producer_id = self.id;
let _ = self.client.executor.spawn(Box::pin(async move {
let _res = drop_receiver.await;
match conn.upgrade() {
None => {
debug!("Connection already dropped, no weak reference remaining")
}
Some(connection) => {
debug!("Closing producers of connection {}", connection.id());
let _ = connection.sender().close_producer(producer_id).await;
}
}
}));
self.batch = batch;
self.drop_signal = _drop_signal;
Ok(())
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn close(&self) -> Result<(), Error> {
let connection = Arc::downgrade(&self.connection);
match connection.upgrade() {
None => {
info!("Connection already gone");
Ok(())
}
Some(connection) => {
info!(
"Closing connection #{} of producer[{}]",
self.connection.id(),
self.name
);
connection
.sender()
.close_producer(self.id)
.await
.map(drop)
.map_err(Error::Connection)
}
}
}
}
#[derive(Clone)]
pub struct ProducerBuilder<Exe: Executor> {
pulsar: Pulsar<Exe>,
topic: Option<String>,
name: Option<String>,
producer_options: Option<ProducerOptions>,
}
impl<Exe: Executor> ProducerBuilder<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn new(pulsar: &Pulsar<Exe>) -> Self {
ProducerBuilder {
pulsar: pulsar.clone(),
topic: None,
name: None,
producer_options: None,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_topic<S: Into<String>>(mut self, topic: S) -> Self {
self.topic = Some(topic.into());
self
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_name<S: Into<String>>(mut self, name: S) -> Self {
self.name = Some(name.into());
self
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_options(mut self, options: ProducerOptions) -> Self {
self.producer_options = Some(options);
self
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn build(self) -> Result<Producer<Exe>, Error> {
let ProducerBuilder {
pulsar,
topic,
name,
producer_options,
} = self;
let topic = topic.ok_or_else(|| Error::Custom("topic not set".to_string()))?;
let options = producer_options.unwrap_or_default();
let producers: Vec<TopicProducer<Exe>> = try_join_all(
pulsar
.lookup_partitioned_topic(&topic)
.await?
.into_iter()
.map(|(topic, addr)| {
let name = name.clone();
let options = options.clone();
let pulsar = pulsar.clone();
async move {
let conn = pulsar.manager.get_connection(&addr).await?;
let producer =
TopicProducer::from_connection(pulsar, conn, topic, name, options)
.await?;
Ok::<TopicProducer<Exe>, Error>(producer)
}
}),
)
.await?;
let producer = match producers.len() {
0 => {
return Err(Error::Custom(format!(
"Unexpected error: Partition lookup returned no topics for {}",
topic
)))
}
1 => ProducerInner::Single(producers.into_iter().next().unwrap()),
_ => {
let mut producers = VecDeque::from(producers);
producers.rotate_right(1);
ProducerInner::Partitioned(PartitionedProducer {
producers,
topic,
options,
})
}
};
Ok(Producer { inner: producer })
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn build_multi_topic(self) -> MultiTopicProducer<Exe> {
MultiTopicProducer {
client: self.pulsar,
producers: Default::default(),
options: self.producer_options.unwrap_or_default(),
name: self.name,
}
}
}
struct Batch {
pub length: u32,
#[allow(clippy::type_complexity)]
pub storage: Mutex<
VecDeque<(
oneshot::Sender<Result<proto::CommandSendReceipt, Error>>,
BatchedMessage,
)>,
>,
}
impl Batch {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn new(length: u32) -> Batch {
Batch {
length,
storage: Mutex::new(VecDeque::with_capacity(length as usize)),
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn is_full(&self) -> bool {
self.storage.lock().await.len() >= self.length as usize
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn push_back(
&self,
msg: (
oneshot::Sender<Result<proto::CommandSendReceipt, Error>>,
ProducerMessage,
),
) {
let (tx, message) = msg;
let properties = message
.properties
.into_iter()
.map(|(key, value)| proto::KeyValue { key, value })
.collect();
let batched = BatchedMessage {
metadata: proto::SingleMessageMetadata {
properties,
partition_key: message.partition_key,
ordering_key: message.ordering_key,
payload_size: message.payload.len() as i32,
event_time: message.event_time,
..Default::default()
},
payload: message.payload,
};
self.storage.lock().await.push_back((tx, batched))
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn get_messages(
&self,
) -> Vec<(
oneshot::Sender<Result<proto::CommandSendReceipt, Error>>,
BatchedMessage,
)> {
self.storage.lock().await.drain(..).collect()
}
}
pub struct MessageBuilder<'a, T, Exe: Executor> {
producer: &'a mut Producer<Exe>,
properties: HashMap<String, String>,
partition_key: Option<String>,
ordering_key: Option<Vec<u8>>,
deliver_at_time: Option<i64>,
event_time: Option<u64>,
content: T,
}
impl<'a, Exe: Executor> MessageBuilder<'a, (), Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn new(producer: &'a mut Producer<Exe>) -> Self {
MessageBuilder {
producer,
properties: HashMap::new(),
partition_key: None,
ordering_key: None,
deliver_at_time: None,
event_time: None,
content: (),
}
}
}
impl<'a, T, Exe: Executor> MessageBuilder<'a, T, Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_content<C>(self, content: C) -> MessageBuilder<'a, C, Exe> {
MessageBuilder {
producer: self.producer,
properties: self.properties,
partition_key: self.partition_key,
ordering_key: self.ordering_key,
deliver_at_time: self.deliver_at_time,
event_time: self.event_time,
content,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_partition_key<S: Into<String>>(mut self, partition_key: S) -> Self {
self.partition_key = Some(partition_key.into());
self
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_ordering_key<S: Into<String>>(mut self, partition_key: S) -> Self {
self.partition_key = Some(partition_key.into());
self
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_key<S: Into<String>>(mut self, partition_key: S) -> Self {
self.partition_key = Some(partition_key.into());
self
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_property<S1: Into<String>, S2: Into<String>>(mut self, key: S1, value: S2) -> Self {
self.properties.insert(key.into(), value.into());
self
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn deliver_at(mut self, date: SystemTime) -> Result<Self, std::time::SystemTimeError> {
self.deliver_at_time = Some(date.duration_since(UNIX_EPOCH)?.as_millis() as i64);
Ok(self)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn delay(mut self, delay: Duration) -> Result<Self, std::time::SystemTimeError> {
let date = SystemTime::now() + delay;
println!(
"current date: {}, deliver_at: {}",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(),
date.duration_since(UNIX_EPOCH)?.as_millis()
);
self.deliver_at_time = Some(date.duration_since(UNIX_EPOCH)?.as_millis() as i64);
Ok(self)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn event_time(mut self, event_time: u64) -> Self {
self.event_time = Some(event_time);
self
}
}
impl<'a, T: SerializeMessage + Sized, Exe: Executor> MessageBuilder<'a, T, Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send(self) -> Result<SendFuture, Error> {
let MessageBuilder {
producer,
properties,
partition_key,
ordering_key,
content,
deliver_at_time,
event_time,
} = self;
let mut message = T::serialize_message(content)?;
message.properties = properties;
message.partition_key = partition_key;
message.ordering_key = ordering_key;
message.event_time = event_time;
let mut producer_message: ProducerMessage = message.into();
producer_message.deliver_at_time = deliver_at_time;
producer.send_raw(producer_message).await
}
}