use std::string::FromUtf8Error;
use std::sync::Arc;
use futures::channel::{oneshot, mpsc};
use crate::connection::Authentication;
use crate::connection_manager::{BackOffOptions, BrokerAddress, ConnectionManager, TlsOptions};
use crate::consumer::ConsumerBuilder;
use crate::error::Error;
use crate::executor::Executor;
use crate::message::proto::{self, CommandSendReceipt};
use crate::message::Payload;
use crate::producer::{self, ProducerBuilder, SendFuture};
use crate::service_discovery::ServiceDiscovery;
use futures::StreamExt;
pub trait DeserializeMessage {
type Output: Sized;
fn deserialize_message(payload: &Payload) -> Self::Output;
}
impl DeserializeMessage for Vec<u8> {
type Output = Self;
fn deserialize_message(payload: &Payload) -> Self::Output {
payload.data.to_vec()
}
}
impl DeserializeMessage for String {
type Output = Result<String, FromUtf8Error>;
fn deserialize_message(payload: &Payload) -> Self::Output {
String::from_utf8(payload.data.to_vec())
}
}
pub trait SerializeMessage {
fn serialize_message(input: Self) -> Result<producer::Message, Error>;
}
impl SerializeMessage for producer::Message {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
Ok(input)
}
}
impl<'a> SerializeMessage for &'a [u8] {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
Ok(producer::Message {
payload: input.to_vec(),
..Default::default()
})
}
}
impl SerializeMessage for Vec<u8> {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
Ok(producer::Message {
payload: input,
..Default::default()
})
}
}
impl SerializeMessage for String {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.into_bytes();
Ok(producer::Message {
payload,
..Default::default()
})
}
}
impl<'a> SerializeMessage for &String {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.as_bytes().to_vec();
Ok(producer::Message {
payload,
..Default::default()
})
}
}
impl<'a> SerializeMessage for &'a str {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.as_bytes().to_vec();
Ok(producer::Message {
payload,
..Default::default()
})
}
}
#[derive(Clone)]
pub struct Pulsar<Exe: Executor> {
pub(crate) manager: Arc<ConnectionManager<Exe>>,
service_discovery: Arc<ServiceDiscovery<Exe>>,
producer: mpsc::UnboundedSender<SendMessage>,
pub(crate) executor: Arc<Exe>,
}
impl<Exe: Executor> Pulsar<Exe> {
pub(crate) async fn new<S: Into<String>>(
url: S,
auth: Option<Authentication>,
backoff_parameters: Option<BackOffOptions>,
tls_options: Option<TlsOptions>,
executor: Exe,
) -> Result<Self, Error> {
let url: String = url.into();
let executor = Arc::new(executor);
let manager = ConnectionManager::new(url, auth, backoff_parameters, tls_options, executor.clone()).await?;
let manager = Arc::new(manager);
let service_discovery = Arc::new(ServiceDiscovery::with_manager(manager.clone()));
let (producer, producer_rx) = mpsc::unbounded();
let client = Pulsar {
manager,
service_discovery,
producer,
executor,
};
let _ = client.executor.spawn(Box::pin(run_producer(client.clone(), producer_rx)));
Ok(client)
}
pub fn builder<S: Into<String>>(url: S, executor: Exe) -> PulsarBuilder<Exe> {
PulsarBuilder {
url: url.into(),
auth: None,
back_off_options: None,
tls_options: None,
executor: executor,
}
}
pub fn consumer(&self) -> ConsumerBuilder<Exe> {
ConsumerBuilder::new(self)
}
pub fn producer(&self) -> ProducerBuilder<Exe> {
ProducerBuilder::new(self)
}
pub async fn lookup_topic<S: Into<String>>(&self, topic: S) -> Result<BrokerAddress, Error> {
self.service_discovery
.lookup_topic(topic)
.await
.map_err(|e| e.into())
}
pub async fn lookup_partitioned_topic_number<S: Into<String>>(
&self,
topic: S,
) -> Result<u32, Error> {
self.service_discovery
.lookup_partitioned_topic_number(topic)
.await
.map_err(|e| e.into())
}
pub async fn lookup_partitioned_topic<S: Into<String>>(
&self,
topic: S,
) -> Result<Vec<(String, BrokerAddress)>, Error> {
self.service_discovery
.lookup_partitioned_topic(topic)
.await
.map_err(|e| e.into())
}
pub async fn get_topics_of_namespace(
&self,
namespace: String,
mode: proto::get_topics::Mode,
) -> Result<Vec<String>, Error> {
let conn = self.manager.get_base_connection().await?;
let topics = conn
.sender()
.get_topics_of_namespace(namespace, mode)
.await?;
Ok(topics.topics)
}
pub async fn send<S: Into<String>, M: SerializeMessage + Sized>(
&self,
topic: S,
message: M,
) -> Result<SendFuture, Error> {
let message = M::serialize_message(message)?;
self.send_raw(message, topic).await
}
async fn send_raw<S: Into<String>>(
&self,
message: producer::Message,
topic: S,
) -> Result<SendFuture, Error> {
let (resolver, future) = oneshot::channel();
self.producer.unbounded_send(SendMessage {
topic: topic.into(),
message,
resolver
}).map_err(|_| Error::Custom("producer unexpectedly disconnected".into()))?;
Ok(SendFuture(future))
}
}
pub struct PulsarBuilder<Exe: Executor> {
url: String,
auth: Option<Authentication>,
back_off_options: Option<BackOffOptions>,
tls_options: Option<TlsOptions>,
executor: Exe,
}
impl<Exe: Executor> PulsarBuilder<Exe> {
pub fn with_auth(self, auth: Authentication) -> Self {
PulsarBuilder {
url: self.url,
auth: Some(auth),
back_off_options: self.back_off_options,
tls_options: self.tls_options,
executor: self.executor,
}
}
pub fn with_back_off_options(self, back_off_options: BackOffOptions) -> Self {
PulsarBuilder {
url: self.url,
auth: self.auth,
back_off_options: Some(back_off_options),
tls_options: self.tls_options,
executor: self.executor,
}
}
pub fn with_certificate_chain(self, certificate_chain: Vec<u8>) -> Self {
PulsarBuilder {
url: self.url,
auth: self.auth,
back_off_options: self.back_off_options,
tls_options: Some(TlsOptions {
certificate_chain: Some(certificate_chain),
}),
executor: self.executor,
}
}
pub fn with_certificate_chain_file<P: AsRef<std::path::Path>>(
self,
path: P,
) -> Result<Self, std::io::Error> {
use std::io::Read;
let mut file = std::fs::File::open(path)?;
let mut v = vec![];
file.read_to_end(&mut v)?;
Ok(self.with_certificate_chain(v))
}
pub async fn build(self) -> Result<Pulsar<Exe>, Error> {
let PulsarBuilder {
url,
auth,
back_off_options,
tls_options,
executor,
} = self;
Pulsar::new(url, auth, back_off_options, tls_options, executor).await
}
}
struct SendMessage {
topic: String,
message: producer::Message,
resolver: oneshot::Sender<Result<CommandSendReceipt, Error>>,
}
async fn run_producer<Exe: Executor>(client: Pulsar<Exe>, mut messages: mpsc::UnboundedReceiver<SendMessage>) {
let mut producer = client.producer().build_multi_topic();
while let Some(SendMessage { topic, message: payload, resolver }) = messages.next().await {
match producer.send(topic, payload).await {
Ok(future) => {
let _ = client.executor.spawn(Box::pin(async move {
let _ = resolver.send(future.await);
}));
}
Err(e) => {
let _ = resolver.send(Err(e));
}
}
}
}