use native_tls::Certificate;
use proto::MessageIdData;
use rand::{thread_rng, Rng};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;
use futures::{
self,
channel::{mpsc, oneshot},
future::{select, Either},
pin_mut,
task::{Context, Poll},
Future, FutureExt, Sink, SinkExt, Stream, StreamExt,
};
use url::Url;
use crate::consumer::ConsumerOptions;
use crate::error::{ConnectionError, SharedError, AuthenticationError};
use crate::executor::{Executor, ExecutorKind};
use crate::message::{
proto::{self, command_subscribe::SubType},
BaseCommand, Codec, Message,
};
use crate::producer::{self, ProducerOptions};
use async_trait::async_trait;
use futures::lock::Mutex;
pub(crate) enum Register {
Request {
key: RequestKey,
resolver: oneshot::Sender<Message>,
},
Consumer {
consumer_id: u64,
resolver: mpsc::UnboundedSender<Message>,
},
Ping {
resolver: oneshot::Sender<()>,
},
}
#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq)]
pub enum RequestKey {
RequestId(u64),
ProducerSend { producer_id: u64, sequence_id: u64 },
Consumer { consumer_id: u64 },
CloseConsumer { consumer_id: u64, request_id: u64 },
}
#[derive(Clone)]
pub struct Authentication {
pub name: String,
pub data: Vec<u8>,
}
#[async_trait]
impl crate::authentication::Authentication for Authentication {
fn auth_method_name(&self) -> String {
self.name.clone()
}
async fn initialize(&mut self) -> Result<(), AuthenticationError> {
Ok(())
}
async fn auth_data(&mut self) -> Result<Vec<u8>, AuthenticationError> {
Ok(self.data.clone())
}
}
pub(crate) struct Receiver<S: Stream<Item = Result<Message, ConnectionError>>> {
inbound: Pin<Box<S>>,
outbound: mpsc::UnboundedSender<Message>,
error: SharedError,
pending_requests: BTreeMap<RequestKey, oneshot::Sender<Message>>,
consumers: BTreeMap<u64, mpsc::UnboundedSender<Message>>,
received_messages: BTreeMap<RequestKey, Message>,
registrations: Pin<Box<mpsc::UnboundedReceiver<Register>>>,
shutdown: Pin<Box<oneshot::Receiver<()>>>,
ping: Option<oneshot::Sender<()>>,
}
impl<S: Stream<Item = Result<Message, ConnectionError>>> Receiver<S> {
pub fn new(
inbound: S,
outbound: mpsc::UnboundedSender<Message>,
error: SharedError,
registrations: mpsc::UnboundedReceiver<Register>,
shutdown: oneshot::Receiver<()>,
) -> Receiver<S> {
Receiver {
inbound: Box::pin(inbound),
outbound,
error,
pending_requests: BTreeMap::new(),
received_messages: BTreeMap::new(),
consumers: BTreeMap::new(),
registrations: Box::pin(registrations),
shutdown: Box::pin(shutdown),
ping: None,
}
}
}
impl<S: Stream<Item = Result<Message, ConnectionError>>> Future for Receiver<S> {
type Output = Result<(), ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.shutdown.as_mut().poll(cx) {
Poll::Ready(Ok(())) | Poll::Ready(Err(futures::channel::oneshot::Canceled)) => {
return Poll::Ready(Err(()))
}
Poll::Pending => {}
}
loop {
match self.registrations.as_mut().poll_next(cx) {
Poll::Ready(Some(Register::Request { key, resolver })) => {
match self.received_messages.remove(&key) {
Some(msg) => {
let _ = resolver.send(msg);
}
None => {
self.pending_requests.insert(key, resolver);
}
}
}
Poll::Ready(Some(Register::Consumer {
consumer_id,
resolver,
})) => {
self.consumers.insert(consumer_id, resolver);
}
Poll::Ready(Some(Register::Ping { resolver })) => {
self.ping = Some(resolver);
}
Poll::Ready(None) => {
self.error.set(ConnectionError::Disconnected);
return Poll::Ready(Err(()));
}
Poll::Pending => break,
}
}
loop {
match self.inbound.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => match msg {
Message {
command: BaseCommand { ping: Some(_), .. },
..
} => {
let _ = self.outbound.unbounded_send(messages::pong());
}
Message {
command: BaseCommand { pong: Some(_), .. },
..
} => {
if let Some(sender) = self.ping.take() {
let _ = sender.send(());
}
}
msg => match msg.request_key() {
Some(key @ RequestKey::RequestId(_))
| Some(key @ RequestKey::ProducerSend { .. }) => {
if let Some(resolver) = self.pending_requests.remove(&key) {
let _ = resolver.send(msg);
} else {
self.received_messages.insert(key, msg);
}
}
Some(RequestKey::Consumer { consumer_id }) => {
let _ = self
.consumers
.get_mut(&consumer_id)
.map(move |consumer| consumer.unbounded_send(msg));
}
Some(RequestKey::CloseConsumer {
consumer_id,
request_id,
}) => {
if let Some(resolver) = self
.pending_requests
.remove(&RequestKey::RequestId(request_id))
{
let _ = resolver.send(msg);
} else {
let res = self
.consumers
.get_mut(&consumer_id)
.map(move |consumer| consumer.unbounded_send(msg));
if !res.as_ref().map(|r| r.is_ok()).unwrap_or(false) {
error!("ConnectionReceiver: error transmitting message to consumer: {:?}", res);
}
}
}
None => {
warn!(
"Received unexpected message; dropping. Message {:?}",
msg.command
)
}
},
},
Poll::Ready(None) => {
self.error.set(ConnectionError::Disconnected);
return Poll::Ready(Err(()));
}
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Err(e))) => {
self.error.set(e);
return Poll::Ready(Err(()));
}
}
}
}
}
#[derive(Clone)]
pub struct SerialId(Arc<AtomicUsize>);
impl Default for SerialId {
fn default() -> Self {
SerialId(Arc::new(AtomicUsize::new(0)))
}
}
impl SerialId {
pub fn new() -> Self {
Self::default()
}
pub fn get(&self) -> u64 {
self.0.fetch_add(1, Ordering::Relaxed) as u64
}
}
pub struct ConnectionSender<Exe: Executor> {
tx: mpsc::UnboundedSender<Message>,
registrations: mpsc::UnboundedSender<Register>,
receiver_shutdown: Option<oneshot::Sender<()>>,
request_id: SerialId,
error: SharedError,
executor: Arc<Exe>,
operation_timeout: Duration,
}
impl<Exe: Executor> ConnectionSender<Exe> {
pub(crate) fn new(
tx: mpsc::UnboundedSender<Message>,
registrations: mpsc::UnboundedSender<Register>,
receiver_shutdown: oneshot::Sender<()>,
request_id: SerialId,
error: SharedError,
executor: Arc<Exe>,
operation_timeout: Duration,
) -> ConnectionSender<Exe> {
ConnectionSender {
tx,
registrations,
receiver_shutdown: Some(receiver_shutdown),
request_id,
error,
executor,
operation_timeout,
}
}
pub(crate) async fn send(
&self,
producer_id: u64,
producer_name: String,
sequence_id: u64,
message: producer::ProducerMessage,
) -> Result<proto::CommandSendReceipt, ConnectionError> {
let key = RequestKey::ProducerSend {
producer_id,
sequence_id,
};
let msg = messages::send(producer_id, producer_name, sequence_id, message);
self.send_message(msg, key, |resp| resp.command.send_receipt)
.await
}
pub async fn send_ping(&self) -> Result<(), ConnectionError> {
let (resolver, response) = oneshot::channel();
trace!("sending ping");
match (
self.registrations
.unbounded_send(Register::Ping { resolver }),
self.tx.unbounded_send(messages::ping()),
) {
(Ok(_), Ok(_)) => {
let delay_f = self.executor.delay(self.operation_timeout);
pin_mut!(response);
pin_mut!(delay_f);
match select(response, delay_f).await {
Either::Left((res, _)) => res
.map_err(|oneshot::Canceled| {
self.error.set(ConnectionError::Disconnected);
ConnectionError::Disconnected
})
.map(move |_| trace!("received pong")),
Either::Right(_) => {
self.error.set(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout when sending ping to the Pulsar server",
)));
Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout when sending ping to the Pulsar server",
)))
}
}
}
_ => Err(ConnectionError::Disconnected),
}
}
pub async fn lookup_topic<S: Into<String>>(
&self,
topic: S,
authoritative: bool,
) -> Result<proto::CommandLookupTopicResponse, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::lookup_topic(topic.into(), authoritative, request_id);
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.lookup_topic_response
})
.await
}
pub async fn lookup_partitioned_topic<S: Into<String>>(
&self,
topic: S,
) -> Result<proto::CommandPartitionedTopicMetadataResponse, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::lookup_partitioned_topic(topic.into(), request_id);
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.partition_metadata_response
})
.await
}
pub async fn create_producer(
&self,
topic: String,
producer_id: u64,
producer_name: Option<String>,
options: ProducerOptions,
) -> Result<proto::CommandProducerSuccess, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::create_producer(topic, producer_name, producer_id, request_id, options);
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.producer_success
})
.await
}
pub async fn get_topics_of_namespace(
&self,
namespace: String,
mode: proto::command_get_topics_of_namespace::Mode,
) -> Result<proto::CommandGetTopicsOfNamespaceResponse, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::get_topics_of_namespace(request_id, namespace, mode);
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.get_topics_of_namespace_response
})
.await
}
pub async fn close_producer(
&self,
producer_id: u64,
) -> Result<proto::CommandSuccess, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::close_producer(producer_id, request_id);
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.success
})
.await
}
pub async fn subscribe(
&self,
resolver: mpsc::UnboundedSender<Message>,
topic: String,
subscription: String,
sub_type: SubType,
consumer_id: u64,
consumer_name: Option<String>,
options: ConsumerOptions,
) -> Result<proto::CommandSuccess, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::subscribe(
topic,
subscription,
sub_type,
consumer_id,
request_id,
consumer_name,
options,
);
match self.registrations.unbounded_send(Register::Consumer {
consumer_id,
resolver,
}) {
Ok(_) => {}
Err(_) => {
self.error.set(ConnectionError::Disconnected);
return Err(ConnectionError::Disconnected);
}
}
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.success
})
.await
}
pub fn send_flow(&self, consumer_id: u64, message_permits: u32) -> Result<(), ConnectionError> {
self.tx
.unbounded_send(messages::flow(consumer_id, message_permits))
.map_err(|_| ConnectionError::Disconnected)
}
pub fn send_ack(
&self,
consumer_id: u64,
message_ids: Vec<proto::MessageIdData>,
cumulative: bool,
) -> Result<(), ConnectionError> {
self.tx
.unbounded_send(messages::ack(consumer_id, message_ids, cumulative))
.map_err(|_| ConnectionError::Disconnected)
}
pub fn send_redeliver_unacknowleged_messages(
&self,
consumer_id: u64,
message_ids: Vec<proto::MessageIdData>,
) -> Result<(), ConnectionError> {
self.tx
.unbounded_send(messages::redeliver_unacknowleged_messages(
consumer_id,
message_ids,
))
.map_err(|_| ConnectionError::Disconnected)
}
pub async fn close_consumer(
&self,
consumer_id: u64,
) -> Result<proto::CommandSuccess, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::close_consumer(consumer_id, request_id);
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.success
})
.await
}
pub async fn seek(
&self,
consumer_id: u64,
message_id: Option<MessageIdData>,
timestamp: Option<u64>,
) -> Result<proto::CommandSuccess, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::seek(consumer_id, request_id, message_id, timestamp);
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.success
})
.await
}
pub async fn unsubscribe(
&self,
consumer_id: u64,
) -> Result<proto::CommandSuccess, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::unsubscribe(consumer_id, request_id);
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.success
})
.await
}
async fn send_message<R: Debug, F>(
&self,
msg: Message,
key: RequestKey,
extract: F,
) -> Result<R, ConnectionError>
where
F: FnOnce(Message) -> Option<R>,
{
let (resolver, response) = oneshot::channel();
trace!("sending message(key = {:?}): {:?}", key, msg);
let k = key.clone();
let response = async {
response
.await
.map_err(|oneshot::Canceled| {
self.error.set(ConnectionError::Disconnected);
ConnectionError::Disconnected
})
.map(move |message: Message| {
trace!("received message(key = {:?}): {:?}", k, message);
extract_message(message, extract)
})?
};
match (
self.registrations
.unbounded_send(Register::Request { key, resolver }),
self.tx.unbounded_send(msg),
) {
(Ok(_), Ok(_)) => {
let delay_f = self.executor.delay(self.operation_timeout);
pin_mut!(response);
pin_mut!(delay_f);
match select(response, delay_f).await {
Either::Left((res, _)) => res,
Either::Right(_) => Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout sending message to the Pulsar server",
))),
}
}
_ => Err(ConnectionError::Disconnected),
}
}
}
pub struct Connection<Exe: Executor> {
id: i64,
url: Url,
sender: ConnectionSender<Exe>,
}
impl<Exe: Executor> Connection<Exe> {
pub async fn new(
url: Url,
auth_data: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
proxy_to_broker_url: Option<String>,
certificate_chain: &[Certificate],
allow_insecure_connection: bool,
tls_hostname_verification_enabled: bool,
connection_timeout: Duration,
operation_timeout: Duration,
executor: Arc<Exe>,
) -> Result<Connection<Exe>, ConnectionError> {
if url.scheme() != "pulsar" && url.scheme() != "pulsar+ssl" {
error!("invalid scheme: {}", url.scheme());
return Err(ConnectionError::NotFound);
}
let hostname = url.host().map(|s| s.to_string());
let tls = match url.scheme() {
"pulsar" => false,
"pulsar+ssl" => true,
s => {
error!("invalid scheme: {}", s);
return Err(ConnectionError::NotFound);
}
};
let u = url.clone();
let address: SocketAddr = match executor
.spawn_blocking(move || {
u.socket_addrs(|| match u.scheme() {
"pulsar" => Some(6650),
"pulsar+ssl" => Some(6651),
_ => None,
})
.map_err(|e| {
error!("could not look up address: {:?}", e);
e
})
.ok()
.and_then(|v| {
let mut rng = thread_rng();
let index: usize = rng.gen_range(0..v.len());
v.get(index).copied()
})
})
.await
{
Some(Some(address)) => address,
_ =>
{
return Err(ConnectionError::NotFound)
}
};
let hostname = hostname.unwrap_or_else(|| address.ip().to_string());
debug!("Connecting to {}: {}", url, address);
let sender_prepare = Connection::prepare_stream(
address,
hostname,
tls,
auth_data,
proxy_to_broker_url,
certificate_chain,
allow_insecure_connection,
tls_hostname_verification_enabled,
executor.clone(),
operation_timeout,
);
let delay_f = executor.delay(connection_timeout);
pin_mut!(sender_prepare);
pin_mut!(delay_f);
let sender;
match select(sender_prepare, delay_f).await {
Either::Left((res, _)) => sender = res?,
Either::Right(_) => {
return Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout connecting to the Pulsar server",
)));
}
};
let id = rand::random();
Ok(Connection { id, url, sender })
}
async fn prepare_auth_data(auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>)
-> Result<Option<Authentication>, ConnectionError> {
match auth {
Some(m_auth) => {
let mut auth_guard = m_auth.lock().await;
Ok(Some(Authentication {
name: auth_guard.auth_method_name(),
data: auth_guard.auth_data().await?,
}))
}
None => Ok(None)
}
}
async fn prepare_stream(
address: SocketAddr,
hostname: String,
tls: bool,
auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
proxy_to_broker_url: Option<String>,
certificate_chain: &[Certificate],
allow_insecure_connection: bool,
tls_hostname_verification_enabled: bool,
executor: Arc<Exe>,
operation_timeout: Duration,
) -> Result<ConnectionSender<Exe>, ConnectionError> {
match executor.kind() {
#[cfg(feature = "tokio-runtime")]
ExecutorKind::Tokio => {
if tls {
let stream = tokio::net::TcpStream::connect(&address).await?;
let mut builder = native_tls::TlsConnector::builder();
for certificate in certificate_chain {
builder.add_root_certificate(certificate.clone());
}
builder.danger_accept_invalid_hostnames(
allow_insecure_connection && !tls_hostname_verification_enabled,
);
builder.danger_accept_invalid_certs(allow_insecure_connection);
let cx = builder.build()?;
let cx = tokio_native_tls::TlsConnector::from(cx);
let stream = cx
.connect(&hostname, stream)
.await
.map(|stream| tokio_util::codec::Framed::new(stream, Codec))?;
Connection::connect(
stream,
Self::prepare_auth_data(auth).await?,
proxy_to_broker_url,
executor,
operation_timeout,
)
.await
} else {
let stream = tokio::net::TcpStream::connect(&address)
.await
.map(|stream| tokio_util::codec::Framed::new(stream, Codec))?;
Connection::connect(
stream,
Self::prepare_auth_data(auth).await?,
proxy_to_broker_url,
executor,
operation_timeout,
)
.await
}
}
#[cfg(not(feature = "tokio-runtime"))]
ExecutorKind::Tokio => {
unimplemented!("the tokio-runtime cargo feature is not active");
}
#[cfg(feature = "async-std-runtime")]
ExecutorKind::AsyncStd => {
if tls {
let stream = async_std::net::TcpStream::connect(&address).await?;
let mut connector = async_native_tls::TlsConnector::new();
for certificate in certificate_chain {
connector = connector.add_root_certificate(certificate.clone());
}
connector = connector.danger_accept_invalid_hostnames(
allow_insecure_connection && !tls_hostname_verification_enabled,
);
connector = connector.danger_accept_invalid_certs(allow_insecure_connection);
let stream = connector
.connect(&hostname, stream)
.await
.map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;
Connection::connect(
stream,
Self::prepare_auth_data(auth).await?,
proxy_to_broker_url,
executor,
operation_timeout,
)
.await
} else {
let stream = async_std::net::TcpStream::connect(&address)
.await
.map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;
Connection::connect(
stream,
Self::prepare_auth_data(auth).await?,
proxy_to_broker_url,
executor,
operation_timeout,
)
.await
}
}
#[cfg(not(feature = "async-std-runtime"))]
ExecutorKind::AsyncStd => {
unimplemented!("the async-std-runtime cargo feature is not active");
}
}
}
pub async fn connect<S>(
mut stream: S,
auth_data: Option<Authentication>,
proxy_to_broker_url: Option<String>,
executor: Arc<Exe>,
operation_timeout: Duration,
) -> Result<ConnectionSender<Exe>, ConnectionError>
where
S: Stream<Item = Result<Message, ConnectionError>>,
S: Sink<Message, Error = ConnectionError>,
S: Send + std::marker::Unpin + 'static,
{
let _ = stream
.send({
let msg = messages::connect(auth_data, proxy_to_broker_url);
trace!("connection message: {:?}", msg);
msg
})
.await?;
let msg = stream.next().await;
match msg {
Some(Ok(Message {
command:
proto::BaseCommand {
error: Some(error), ..
},
..
})) => Err(ConnectionError::PulsarError(
crate::error::server_error(error.error),
Some(error.message),
)),
Some(Ok(msg)) => {
let cmd = msg.command.clone();
trace!("received connection response: {:?}", msg);
msg.command.connected.ok_or_else(|| {
ConnectionError::Unexpected(format!(
"Unexpected message from pulsar: {:?}",
cmd
))
})
}
Some(Err(e)) => Err(e),
None => Err(ConnectionError::Disconnected),
}?;
let (mut sink, stream) = stream.split();
let (tx, mut rx) = mpsc::unbounded();
let (registrations_tx, registrations_rx) = mpsc::unbounded();
let error = SharedError::new();
let (receiver_shutdown_tx, receiver_shutdown_rx) = oneshot::channel();
if executor
.spawn(Box::pin(
Receiver::new(
stream,
tx.clone(),
error.clone(),
registrations_rx,
receiver_shutdown_rx,
)
.map(|_| ()),
))
.is_err()
{
error!("the executor could not spawn the Receiver future");
return Err(ConnectionError::Shutdown);
}
let err = error.clone();
let res = executor.spawn(Box::pin(async move {
while let Some(msg) = rx.next().await {
if let Err(e) = sink.send(msg).await {
err.set(e);
break;
}
}
}));
if res.is_err() {
error!("the executor could not spawn the Receiver future");
return Err(ConnectionError::Shutdown);
}
let sender = ConnectionSender::new(
tx,
registrations_tx,
receiver_shutdown_tx,
SerialId::new(),
error,
executor.clone(),
operation_timeout,
);
Ok(sender)
}
pub fn id(&self) -> i64 {
self.id
}
pub fn error(&self) -> Option<ConnectionError> {
self.sender.error.remove()
}
pub fn is_valid(&self) -> bool {
!self.sender.error.is_set()
}
pub fn url(&self) -> &Url {
&self.url
}
pub fn sender(&self) -> &ConnectionSender<Exe> {
&self.sender
}
}
impl<Exe: Executor> Drop for Connection<Exe> {
fn drop(&mut self) {
trace!("dropping connection {} for {}", self.id, self.url);
if let Some(shutdown) = self.sender.receiver_shutdown.take() {
let _ = shutdown.send(());
}
}
}
fn extract_message<T: Debug, F>(message: Message, extract: F) -> Result<T, ConnectionError>
where
F: FnOnce(Message) -> Option<T>,
{
if let Some(e) = message.command.error {
Err(ConnectionError::PulsarError(
crate::error::server_error(e.error),
Some(e.message),
))
} else {
let cmd = message.command.clone();
if let Some(extracted) = extract(message) {
trace!("extracted message: {:?}", extracted);
Ok(extracted)
} else {
Err(ConnectionError::UnexpectedResponse(format!("{:?}", cmd)))
}
}
}
pub(crate) mod messages {
use chrono::Utc;
use proto::MessageIdData;
use crate::connection::Authentication;
use crate::consumer::ConsumerOptions;
use crate::message::{
proto::{self, base_command::Type as CommandType, command_subscribe::SubType},
Message, Payload,
};
use crate::producer::{self, ProducerOptions};
pub fn connect(auth: Option<Authentication>, proxy_to_broker_url: Option<String>) -> Message {
let (auth_method_name, auth_data) = match auth {
Some(auth) => (Some(auth.name), Some(auth.data)),
None => (None, None),
};
Message {
command: proto::BaseCommand {
r#type: CommandType::Connect as i32,
connect: Some(proto::CommandConnect {
auth_method_name,
auth_data,
proxy_to_broker_url,
client_version: String::from("2.0.1-incubating"),
protocol_version: Some(12),
..Default::default()
}),
..Default::default()
},
payload: None,
}
}
pub fn ping() -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::Ping as i32,
ping: Some(proto::CommandPing {}),
..Default::default()
},
payload: None,
}
}
pub fn pong() -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::Pong as i32,
pong: Some(proto::CommandPong {}),
..Default::default()
},
payload: None,
}
}
pub fn create_producer(
topic: String,
producer_name: Option<String>,
producer_id: u64,
request_id: u64,
options: ProducerOptions,
) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::Producer as i32,
producer: Some(proto::CommandProducer {
topic,
producer_id,
request_id,
producer_name,
encrypted: options.encrypted,
metadata: options
.metadata
.iter()
.map(|(k, v)| proto::KeyValue {
key: k.clone(),
value: v.clone(),
})
.collect(),
schema: options.schema,
..Default::default()
}),
..Default::default()
},
payload: None,
}
}
pub fn get_topics_of_namespace(
request_id: u64,
namespace: String,
mode: proto::command_get_topics_of_namespace::Mode,
) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::GetTopicsOfNamespace as i32,
get_topics_of_namespace: Some(proto::CommandGetTopicsOfNamespace {
request_id,
namespace,
mode: Some(mode as i32),
}),
..Default::default()
},
payload: None,
}
}
pub(crate) fn send(
producer_id: u64,
producer_name: String,
sequence_id: u64,
message: producer::ProducerMessage,
) -> Message {
let properties = message
.properties
.into_iter()
.map(|(key, value)| proto::KeyValue { key, value })
.collect();
Message {
command: proto::BaseCommand {
r#type: CommandType::Send as i32,
send: Some(proto::CommandSend {
producer_id,
sequence_id,
num_messages: message.num_messages_in_batch,
..Default::default()
}),
..Default::default()
},
payload: Some(Payload {
metadata: proto::MessageMetadata {
producer_name,
sequence_id,
properties,
publish_time: Utc::now().timestamp_millis() as u64,
replicated_from: None,
partition_key: message.partition_key,
replicate_to: message.replicate_to,
compression: message.compression,
uncompressed_size: message.uncompressed_size,
num_messages_in_batch: message.num_messages_in_batch,
event_time: message.event_time,
encryption_keys: message.encryption_keys,
encryption_algo: message.encryption_algo,
encryption_param: message.encryption_param,
schema_version: message.schema_version,
deliver_at_time: message.deliver_at_time,
..Default::default()
},
data: message.payload,
}),
}
}
pub fn lookup_topic(topic: String, authoritative: bool, request_id: u64) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::Lookup as i32,
lookup_topic: Some(proto::CommandLookupTopic {
topic,
request_id,
authoritative: Some(authoritative),
..Default::default()
}),
..Default::default()
},
payload: None,
}
}
pub fn lookup_partitioned_topic(topic: String, request_id: u64) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::PartitionedMetadata as i32,
partition_metadata: Some(proto::CommandPartitionedTopicMetadata {
topic,
request_id,
..Default::default()
}),
..Default::default()
},
payload: None,
}
}
pub fn close_producer(producer_id: u64, request_id: u64) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::CloseProducer as i32,
close_producer: Some(proto::CommandCloseProducer {
producer_id,
request_id,
}),
..Default::default()
},
payload: None,
}
}
pub fn subscribe(
topic: String,
subscription: String,
sub_type: SubType,
consumer_id: u64,
request_id: u64,
consumer_name: Option<String>,
options: ConsumerOptions,
) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::Subscribe as i32,
subscribe: Some(proto::CommandSubscribe {
topic,
subscription,
sub_type: sub_type as i32,
consumer_id,
request_id,
consumer_name,
priority_level: options.priority_level,
durable: options.durable,
metadata: options
.metadata
.iter()
.map(|(k, v)| proto::KeyValue {
key: k.clone(),
value: v.clone(),
})
.collect(),
read_compacted: Some(options.read_compacted.unwrap_or(false)),
initial_position: Some(options.initial_position.into()),
schema: options.schema,
start_message_id: options.start_message_id,
..Default::default()
}),
..Default::default()
},
payload: None,
}
}
pub fn flow(consumer_id: u64, message_permits: u32) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::Flow as i32,
flow: Some(proto::CommandFlow {
consumer_id,
message_permits,
}),
..Default::default()
},
payload: None,
}
}
pub fn ack(
consumer_id: u64,
message_id: Vec<proto::MessageIdData>,
cumulative: bool,
) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::Ack as i32,
ack: Some(proto::CommandAck {
consumer_id,
ack_type: if cumulative {
proto::command_ack::AckType::Cumulative as i32
} else {
proto::command_ack::AckType::Individual as i32
},
message_id,
validation_error: None,
properties: Vec::new(),
..Default::default()
}),
..Default::default()
},
payload: None,
}
}
pub fn redeliver_unacknowleged_messages(
consumer_id: u64,
message_ids: Vec<proto::MessageIdData>,
) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::RedeliverUnacknowledgedMessages as i32,
redeliver_unacknowledged_messages: Some(
proto::CommandRedeliverUnacknowledgedMessages {
consumer_id,
message_ids,
},
),
..Default::default()
},
payload: None,
}
}
pub fn close_consumer(consumer_id: u64, request_id: u64) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::CloseConsumer as i32,
close_consumer: Some(proto::CommandCloseConsumer {
consumer_id,
request_id,
}),
..Default::default()
},
payload: None,
}
}
pub fn seek(
consumer_id: u64,
request_id: u64,
message_id: Option<MessageIdData>,
message_publish_time: Option<u64>,
) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::Seek as i32,
seek: Some(proto::CommandSeek {
consumer_id,
request_id,
message_id,
message_publish_time,
}),
..Default::default()
},
payload: None,
}
}
pub fn unsubscribe(consumer_id: u64, request_id: u64) -> Message {
Message {
command: proto::BaseCommand {
r#type: CommandType::Unsubscribe as i32,
unsubscribe: Some(proto::CommandUnsubscribe {
consumer_id,
request_id,
}),
..Default::default()
},
payload: None,
}
}
}