use std::string::FromUtf8Error;
use std::sync::Arc;
use futures::channel::{mpsc, oneshot};
use crate::connection::Authentication;
use crate::connection_manager::{
BrokerAddress, ConnectionManager, ConnectionRetryOptions, OperationRetryOptions, TlsOptions,
};
use crate::consumer::{ConsumerBuilder, ConsumerOptions, InitialPosition};
use crate::error::Error;
use crate::executor::Executor;
use crate::message::proto::{self, CommandSendReceipt};
use crate::message::Payload;
use crate::producer::{self, ProducerBuilder, SendFuture};
use crate::service_discovery::ServiceDiscovery;
use futures::StreamExt;
use futures::lock::Mutex;
pub trait DeserializeMessage {
type Output: Sized;
fn deserialize_message(payload: &Payload) -> Self::Output;
}
impl DeserializeMessage for Vec<u8> {
type Output = Self;
fn deserialize_message(payload: &Payload) -> Self::Output {
payload.data.to_vec()
}
}
impl DeserializeMessage for String {
type Output = Result<String, FromUtf8Error>;
fn deserialize_message(payload: &Payload) -> Self::Output {
String::from_utf8(payload.data.to_vec())
}
}
pub trait SerializeMessage {
fn serialize_message(input: Self) -> Result<producer::Message, Error>;
}
impl SerializeMessage for producer::Message {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
Ok(input)
}
}
impl<'a> SerializeMessage for () {
fn serialize_message(_input: Self) -> Result<producer::Message, Error> {
Ok(producer::Message {
..Default::default()
})
}
}
impl<'a> SerializeMessage for &'a [u8] {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
Ok(producer::Message {
payload: input.to_vec(),
..Default::default()
})
}
}
impl SerializeMessage for Vec<u8> {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
Ok(producer::Message {
payload: input,
..Default::default()
})
}
}
impl SerializeMessage for String {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.into_bytes();
Ok(producer::Message {
payload,
..Default::default()
})
}
}
impl<'a> SerializeMessage for &String {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.as_bytes().to_vec();
Ok(producer::Message {
payload,
..Default::default()
})
}
}
impl<'a> SerializeMessage for &'a str {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.as_bytes().to_vec();
Ok(producer::Message {
payload,
..Default::default()
})
}
}
#[derive(Clone)]
pub struct Pulsar<Exe: Executor> {
pub(crate) manager: Arc<ConnectionManager<Exe>>,
service_discovery: Arc<ServiceDiscovery<Exe>>,
producer: Option<mpsc::UnboundedSender<SendMessage>>,
pub(crate) operation_retry_options: OperationRetryOptions,
pub(crate) executor: Arc<Exe>,
}
impl<Exe: Executor> Pulsar<Exe> {
pub(crate) async fn new<S: Into<String>>(
url: S,
auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
connection_retry_parameters: Option<ConnectionRetryOptions>,
operation_retry_parameters: Option<OperationRetryOptions>,
tls_options: Option<TlsOptions>,
executor: Exe,
) -> Result<Self, Error> {
let url: String = url.into();
let executor = Arc::new(executor);
let operation_retry_options = operation_retry_parameters.unwrap_or_default();
let manager = ConnectionManager::new(
url,
auth,
connection_retry_parameters,
operation_retry_options.clone(),
tls_options,
executor.clone(),
)
.await?;
let manager = Arc::new(manager);
let weak_manager = Arc::downgrade(&manager);
let mut interval = executor.interval(std::time::Duration::from_secs(60));
let res = executor.spawn(Box::pin(async move {
while let Some(()) = interval.next().await {
if let Some(strong_manager) = weak_manager.upgrade() {
strong_manager.check_connections().await;
} else {
break;
}
}
}));
if res.is_err() {
error!("the executor could not spawn the check connection task");
return Err(crate::error::ConnectionError::Shutdown.into());
}
let service_discovery = Arc::new(ServiceDiscovery::with_manager(manager.clone()));
let (producer, producer_rx) = mpsc::unbounded();
let mut client = Pulsar {
manager,
service_discovery,
producer: None,
operation_retry_options,
executor,
};
let _ = client
.executor
.spawn(Box::pin(run_producer(client.clone(), producer_rx)));
client.producer = Some(producer);
Ok(client)
}
pub fn builder<S: Into<String>>(url: S, executor: Exe) -> PulsarBuilder<Exe> {
PulsarBuilder {
url: url.into(),
auth_provider: None,
connection_retry_options: None,
operation_retry_options: None,
tls_options: None,
executor,
}
}
pub fn consumer(&self) -> ConsumerBuilder<Exe> {
ConsumerBuilder::new(self)
}
pub fn producer(&self) -> ProducerBuilder<Exe> {
ProducerBuilder::new(self)
}
pub fn reader(&self) -> ConsumerBuilder<Exe> {
ConsumerBuilder::new(self).with_options(
ConsumerOptions::default()
.durable(false)
.with_initial_position(InitialPosition::Latest),
)
}
pub async fn lookup_topic<S: Into<String>>(&self, topic: S) -> Result<BrokerAddress, Error> {
self.service_discovery
.lookup_topic(topic)
.await
.map_err(|e| e.into())
}
pub async fn lookup_partitioned_topic_number<S: Into<String>>(
&self,
topic: S,
) -> Result<u32, Error> {
self.service_discovery
.lookup_partitioned_topic_number(topic)
.await
.map_err(|e| e.into())
}
pub async fn lookup_partitioned_topic<S: Into<String>>(
&self,
topic: S,
) -> Result<Vec<(String, BrokerAddress)>, Error> {
self.service_discovery
.lookup_partitioned_topic(topic)
.await
.map_err(|e| e.into())
}
pub async fn get_topics_of_namespace(
&self,
namespace: String,
mode: proto::command_get_topics_of_namespace::Mode,
) -> Result<Vec<String>, Error> {
let conn = self.manager.get_base_connection().await?;
let topics = conn
.sender()
.get_topics_of_namespace(namespace, mode)
.await?;
Ok(topics.topics)
}
pub async fn send<S: Into<String>, M: SerializeMessage + Sized>(
&self,
topic: S,
message: M,
) -> Result<SendFuture, Error> {
let message = M::serialize_message(message)?;
self.send_raw(message, topic).await
}
async fn send_raw<S: Into<String>>(
&self,
message: producer::Message,
topic: S,
) -> Result<SendFuture, Error> {
let (resolver, future) = oneshot::channel();
self.producer
.as_ref()
.expect("a client without the producer channel should only be used internally")
.unbounded_send(SendMessage {
topic: topic.into(),
message,
resolver,
})
.map_err(|_| Error::Custom("producer unexpectedly disconnected".into()))?;
Ok(SendFuture(future))
}
}
pub struct PulsarBuilder<Exe: Executor> {
url: String,
auth_provider: Option<Box<dyn crate::authentication::Authentication>>,
connection_retry_options: Option<ConnectionRetryOptions>,
operation_retry_options: Option<OperationRetryOptions>,
tls_options: Option<TlsOptions>,
executor: Exe,
}
impl<Exe: Executor> PulsarBuilder<Exe> {
pub fn with_auth(self, auth: Authentication) -> Self {
self.with_auth_provider(Box::new(auth))
}
pub fn with_auth_provider(mut self, auth: Box<dyn crate::authentication::Authentication>) -> Self {
self.auth_provider = Some(auth);
self
}
pub fn with_connection_retry_options(
mut self,
connection_retry_options: ConnectionRetryOptions,
) -> Self {
self.connection_retry_options = Some(connection_retry_options);
self
}
pub fn with_operation_retry_options(
mut self,
operation_retry_options: OperationRetryOptions,
) -> Self {
self.operation_retry_options = Some(operation_retry_options);
self
}
pub fn with_certificate_chain(mut self, certificate_chain: Vec<u8>) -> Self {
match &mut self.tls_options {
Some(tls) => tls.certificate_chain = Some(certificate_chain),
None => {
self.tls_options = Some(TlsOptions {
certificate_chain: Some(certificate_chain),
..Default::default()
})
}
}
self
}
pub fn with_allow_insecure_connection(mut self, allow: bool) -> Self {
match &mut self.tls_options {
Some(tls) => tls.allow_insecure_connection = allow,
None => {
self.tls_options = Some(TlsOptions {
allow_insecure_connection: allow,
..Default::default()
})
}
}
self
}
pub fn with_tls_hostname_verification_enabled(mut self, enabled: bool) -> Self {
match &mut self.tls_options {
Some(tls) => tls.tls_hostname_verification_enabled = enabled,
None => {
self.tls_options = Some(TlsOptions {
tls_hostname_verification_enabled: enabled,
..Default::default()
})
}
}
self
}
pub fn with_certificate_chain_file<P: AsRef<std::path::Path>>(
self,
path: P,
) -> Result<Self, std::io::Error> {
use std::io::Read;
let mut file = std::fs::File::open(path)?;
let mut v = vec![];
file.read_to_end(&mut v)?;
Ok(self.with_certificate_chain(v))
}
pub async fn build(self) -> Result<Pulsar<Exe>, Error> {
let PulsarBuilder {
url,
auth_provider,
connection_retry_options,
operation_retry_options,
tls_options,
executor,
} = self;
Pulsar::new(
url,
auth_provider.map(|p| Arc::new(Mutex::new(p))),
connection_retry_options,
operation_retry_options,
tls_options,
executor,
)
.await
}
}
struct SendMessage {
topic: String,
message: producer::Message,
resolver: oneshot::Sender<Result<CommandSendReceipt, Error>>,
}
async fn run_producer<Exe: Executor>(
client: Pulsar<Exe>,
mut messages: mpsc::UnboundedReceiver<SendMessage>,
) {
let mut producer = client.producer().build_multi_topic();
while let Some(SendMessage {
topic,
message: payload,
resolver,
}) = messages.next().await
{
match producer.send(topic, payload).await {
Ok(future) => {
let _ = client.executor.spawn(Box::pin(async move {
let _ = resolver.send(future.await);
}));
}
Err(e) => {
let _ = resolver.send(Err(e));
}
}
}
}