1use native_tls::Certificate;
2use proto::MessageIdData;
3use rand::{thread_rng, Rng};
4use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::sync::{
9 atomic::{AtomicUsize, Ordering},
10 Arc,
11};
12use std::time::Duration;
13
14use futures::{
15 self,
16 channel::{mpsc, oneshot},
17 future::{select, Either},
18 pin_mut,
19 task::{Context, Poll},
20 Future, FutureExt, Sink, SinkExt, Stream, StreamExt,
21};
22use url::Url;
23
24use crate::consumer::ConsumerOptions;
25use crate::error::{ConnectionError, SharedError, AuthenticationError};
26use crate::executor::{Executor, ExecutorKind};
27use crate::message::{
28 proto::{self, command_subscribe::SubType},
29 BaseCommand, Codec, Message,
30};
31use crate::producer::{self, ProducerOptions};
32use async_trait::async_trait;
33use futures::lock::Mutex;
34
35pub(crate) enum Register {
36 Request {
37 key: RequestKey,
38 resolver: oneshot::Sender<Message>,
39 },
40 Consumer {
41 consumer_id: u64,
42 resolver: mpsc::UnboundedSender<Message>,
43 },
44 Ping {
45 resolver: oneshot::Sender<()>,
46 },
47}
48
49#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq)]
51pub enum RequestKey {
52 RequestId(u64),
53 ProducerSend { producer_id: u64, sequence_id: u64 },
54 Consumer { consumer_id: u64 },
55 CloseConsumer { consumer_id: u64, request_id: u64 },
56}
57
58#[derive(Clone)]
60pub struct Authentication {
61 pub name: String,
63 pub data: Vec<u8>,
65}
66
67#[async_trait]
68impl crate::authentication::Authentication for Authentication {
69 fn auth_method_name(&self) -> String {
70 self.name.clone()
71 }
72
73 async fn initialize(&mut self) -> Result<(), AuthenticationError> {
74 Ok(())
75 }
76
77 async fn auth_data(&mut self) -> Result<Vec<u8>, AuthenticationError> {
78 Ok(self.data.clone())
79 }
80}
81
82pub(crate) struct Receiver<S: Stream<Item = Result<Message, ConnectionError>>> {
83 inbound: Pin<Box<S>>,
84 outbound: mpsc::UnboundedSender<Message>,
85 error: SharedError,
86 pending_requests: BTreeMap<RequestKey, oneshot::Sender<Message>>,
87 consumers: BTreeMap<u64, mpsc::UnboundedSender<Message>>,
88 received_messages: BTreeMap<RequestKey, Message>,
89 registrations: Pin<Box<mpsc::UnboundedReceiver<Register>>>,
90 shutdown: Pin<Box<oneshot::Receiver<()>>>,
91 ping: Option<oneshot::Sender<()>>,
92}
93
94impl<S: Stream<Item = Result<Message, ConnectionError>>> Receiver<S> {
95 pub fn new(
96 inbound: S,
97 outbound: mpsc::UnboundedSender<Message>,
98 error: SharedError,
99 registrations: mpsc::UnboundedReceiver<Register>,
100 shutdown: oneshot::Receiver<()>,
101 ) -> Receiver<S> {
102 Receiver {
103 inbound: Box::pin(inbound),
104 outbound,
105 error,
106 pending_requests: BTreeMap::new(),
107 received_messages: BTreeMap::new(),
108 consumers: BTreeMap::new(),
109 registrations: Box::pin(registrations),
110 shutdown: Box::pin(shutdown),
111 ping: None,
112 }
113 }
114}
115
116impl<S: Stream<Item = Result<Message, ConnectionError>>> Future for Receiver<S> {
117 type Output = Result<(), ()>;
118
119 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
120 match self.shutdown.as_mut().poll(cx) {
121 Poll::Ready(Ok(())) | Poll::Ready(Err(futures::channel::oneshot::Canceled)) => {
122 return Poll::Ready(Err(()))
123 }
124 Poll::Pending => {}
125 }
126
127 loop {
129 match self.registrations.as_mut().poll_next(cx) {
130 Poll::Ready(Some(Register::Request { key, resolver })) => {
131 match self.received_messages.remove(&key) {
132 Some(msg) => {
133 let _ = resolver.send(msg);
134 }
135 None => {
136 self.pending_requests.insert(key, resolver);
137 }
138 }
139 }
140 Poll::Ready(Some(Register::Consumer {
141 consumer_id,
142 resolver,
143 })) => {
144 self.consumers.insert(consumer_id, resolver);
145 }
146 Poll::Ready(Some(Register::Ping { resolver })) => {
147 self.ping = Some(resolver);
148 }
149 Poll::Ready(None) => {
150 self.error.set(ConnectionError::Disconnected);
151 return Poll::Ready(Err(()));
152 }
153 Poll::Pending => break,
154 }
155 }
156
157 loop {
158 match self.inbound.as_mut().poll_next(cx) {
159 Poll::Ready(Some(Ok(msg))) => match msg {
160 Message {
161 command: BaseCommand { ping: Some(_), .. },
162 ..
163 } => {
164 let _ = self.outbound.unbounded_send(messages::pong());
165 }
166 Message {
167 command: BaseCommand { pong: Some(_), .. },
168 ..
169 } => {
170 if let Some(sender) = self.ping.take() {
171 let _ = sender.send(());
172 }
173 }
174 msg => match msg.request_key() {
175 Some(key @ RequestKey::RequestId(_))
176 | Some(key @ RequestKey::ProducerSend { .. }) => {
177 if let Some(resolver) = self.pending_requests.remove(&key) {
178 let _ = resolver.send(msg);
180 } else {
181 self.received_messages.insert(key, msg);
182 }
183 }
184 Some(RequestKey::Consumer { consumer_id }) => {
185 let _ = self
186 .consumers
187 .get_mut(&consumer_id)
188 .map(move |consumer| consumer.unbounded_send(msg));
189 }
190 Some(RequestKey::CloseConsumer {
191 consumer_id,
192 request_id,
193 }) => {
194 if let Some(resolver) = self
197 .pending_requests
198 .remove(&RequestKey::RequestId(request_id))
199 {
200 let _ = resolver.send(msg);
202 } else {
203 let res = self
204 .consumers
205 .get_mut(&consumer_id)
206 .map(move |consumer| consumer.unbounded_send(msg));
207
208 if !res.as_ref().map(|r| r.is_ok()).unwrap_or(false) {
209 error!("ConnectionReceiver: error transmitting message to consumer: {:?}", res);
210 }
211 }
212 }
213 None => {
214 warn!(
215 "Received unexpected message; dropping. Message {:?}",
216 msg.command
217 )
218 }
219 },
220 },
221 Poll::Ready(None) => {
222 self.error.set(ConnectionError::Disconnected);
223 return Poll::Ready(Err(()));
224 }
225 Poll::Pending => return Poll::Pending,
226 Poll::Ready(Some(Err(e))) => {
227 self.error.set(e);
228 return Poll::Ready(Err(()));
229 }
230 }
231 }
232 }
233}
234
235#[derive(Clone)]
236pub struct SerialId(Arc<AtomicUsize>);
237
238impl Default for SerialId {
239 fn default() -> Self {
240 SerialId(Arc::new(AtomicUsize::new(0)))
241 }
242}
243
244impl SerialId {
245 pub fn new() -> Self {
246 Self::default()
247 }
248 pub fn get(&self) -> u64 {
249 self.0.fetch_add(1, Ordering::Relaxed) as u64
250 }
251}
252
253pub struct ConnectionSender<Exe: Executor> {
256 tx: mpsc::UnboundedSender<Message>,
257 registrations: mpsc::UnboundedSender<Register>,
258 receiver_shutdown: Option<oneshot::Sender<()>>,
259 request_id: SerialId,
260 error: SharedError,
261 executor: Arc<Exe>,
262 operation_timeout: Duration,
263}
264
265impl<Exe: Executor> ConnectionSender<Exe> {
266 pub(crate) fn new(
267 tx: mpsc::UnboundedSender<Message>,
268 registrations: mpsc::UnboundedSender<Register>,
269 receiver_shutdown: oneshot::Sender<()>,
270 request_id: SerialId,
271 error: SharedError,
272 executor: Arc<Exe>,
273 operation_timeout: Duration,
274 ) -> ConnectionSender<Exe> {
275 ConnectionSender {
276 tx,
277 registrations,
278 receiver_shutdown: Some(receiver_shutdown),
279 request_id,
280 error,
281 executor,
282 operation_timeout,
283 }
284 }
285
286 pub(crate) async fn send(
287 &self,
288 producer_id: u64,
289 producer_name: String,
290 sequence_id: u64,
291 message: producer::ProducerMessage,
292 ) -> Result<proto::CommandSendReceipt, ConnectionError> {
293 let key = RequestKey::ProducerSend {
294 producer_id,
295 sequence_id,
296 };
297 let msg = messages::send(producer_id, producer_name, sequence_id, message);
298 self.send_message(msg, key, |resp| resp.command.send_receipt)
299 .await
300 }
301
302 pub async fn send_ping(&self) -> Result<(), ConnectionError> {
303 let (resolver, response) = oneshot::channel();
304 trace!("sending ping");
305
306 match (
307 self.registrations
308 .unbounded_send(Register::Ping { resolver }),
309 self.tx.unbounded_send(messages::ping()),
310 ) {
311 (Ok(_), Ok(_)) => {
312 let delay_f = self.executor.delay(self.operation_timeout);
313 pin_mut!(response);
314 pin_mut!(delay_f);
315
316 match select(response, delay_f).await {
317 Either::Left((res, _)) => res
318 .map_err(|oneshot::Canceled| {
319 self.error.set(ConnectionError::Disconnected);
320 ConnectionError::Disconnected
321 })
322 .map(move |_| trace!("received pong")),
323 Either::Right(_) => {
324 self.error.set(ConnectionError::Io(std::io::Error::new(
325 std::io::ErrorKind::TimedOut,
326 "timeout when sending ping to the Pulsar server",
327 )));
328 Err(ConnectionError::Io(std::io::Error::new(
329 std::io::ErrorKind::TimedOut,
330 "timeout when sending ping to the Pulsar server",
331 )))
332 }
333 }
334 }
335 _ => Err(ConnectionError::Disconnected),
336 }
337 }
338
339 pub async fn lookup_topic<S: Into<String>>(
340 &self,
341 topic: S,
342 authoritative: bool,
343 ) -> Result<proto::CommandLookupTopicResponse, ConnectionError> {
344 let request_id = self.request_id.get();
345 let msg = messages::lookup_topic(topic.into(), authoritative, request_id);
346 self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
347 resp.command.lookup_topic_response
348 })
349 .await
350 }
351
352 pub async fn lookup_partitioned_topic<S: Into<String>>(
353 &self,
354 topic: S,
355 ) -> Result<proto::CommandPartitionedTopicMetadataResponse, ConnectionError> {
356 let request_id = self.request_id.get();
357 let msg = messages::lookup_partitioned_topic(topic.into(), request_id);
358 self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
359 resp.command.partition_metadata_response
360 })
361 .await
362 }
363
364 pub async fn create_producer(
365 &self,
366 topic: String,
367 producer_id: u64,
368 producer_name: Option<String>,
369 options: ProducerOptions,
370 ) -> Result<proto::CommandProducerSuccess, ConnectionError> {
371 let request_id = self.request_id.get();
372 let msg = messages::create_producer(topic, producer_name, producer_id, request_id, options);
373 self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
374 resp.command.producer_success
375 })
376 .await
377 }
378
379 pub async fn get_topics_of_namespace(
380 &self,
381 namespace: String,
382 mode: proto::command_get_topics_of_namespace::Mode,
383 ) -> Result<proto::CommandGetTopicsOfNamespaceResponse, ConnectionError> {
384 let request_id = self.request_id.get();
385 let msg = messages::get_topics_of_namespace(request_id, namespace, mode);
386 self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
387 resp.command.get_topics_of_namespace_response
388 })
389 .await
390 }
391
392 pub async fn close_producer(
393 &self,
394 producer_id: u64,
395 ) -> Result<proto::CommandSuccess, ConnectionError> {
396 let request_id = self.request_id.get();
397 let msg = messages::close_producer(producer_id, request_id);
398 self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
399 resp.command.success
400 })
401 .await
402 }
403
404 pub async fn subscribe(
405 &self,
406 resolver: mpsc::UnboundedSender<Message>,
407 topic: String,
408 subscription: String,
409 sub_type: SubType,
410 consumer_id: u64,
411 consumer_name: Option<String>,
412 options: ConsumerOptions,
413 ) -> Result<proto::CommandSuccess, ConnectionError> {
414 let request_id = self.request_id.get();
415 let msg = messages::subscribe(
416 topic,
417 subscription,
418 sub_type,
419 consumer_id,
420 request_id,
421 consumer_name,
422 options,
423 );
424 match self.registrations.unbounded_send(Register::Consumer {
425 consumer_id,
426 resolver,
427 }) {
428 Ok(_) => {}
429 Err(_) => {
430 self.error.set(ConnectionError::Disconnected);
431 return Err(ConnectionError::Disconnected);
432 }
433 }
434 self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
435 resp.command.success
436 })
437 .await
438 }
439
440 pub fn send_flow(&self, consumer_id: u64, message_permits: u32) -> Result<(), ConnectionError> {
441 self.tx
442 .unbounded_send(messages::flow(consumer_id, message_permits))
443 .map_err(|_| ConnectionError::Disconnected)
444 }
445
446 pub fn send_ack(
447 &self,
448 consumer_id: u64,
449 message_ids: Vec<proto::MessageIdData>,
450 cumulative: bool,
451 ) -> Result<(), ConnectionError> {
452 self.tx
453 .unbounded_send(messages::ack(consumer_id, message_ids, cumulative))
454 .map_err(|_| ConnectionError::Disconnected)
455 }
456
457 pub fn send_redeliver_unacknowleged_messages(
458 &self,
459 consumer_id: u64,
460 message_ids: Vec<proto::MessageIdData>,
461 ) -> Result<(), ConnectionError> {
462 self.tx
463 .unbounded_send(messages::redeliver_unacknowleged_messages(
464 consumer_id,
465 message_ids,
466 ))
467 .map_err(|_| ConnectionError::Disconnected)
468 }
469
470 pub async fn close_consumer(
471 &self,
472 consumer_id: u64,
473 ) -> Result<proto::CommandSuccess, ConnectionError> {
474 let request_id = self.request_id.get();
475 let msg = messages::close_consumer(consumer_id, request_id);
476 self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
477 resp.command.success
478 })
479 .await
480 }
481
482 pub async fn seek(
483 &self,
484 consumer_id: u64,
485 message_id: Option<MessageIdData>,
486 timestamp: Option<u64>,
487 ) -> Result<proto::CommandSuccess, ConnectionError> {
488 let request_id = self.request_id.get();
489 let msg = messages::seek(consumer_id, request_id, message_id, timestamp);
490 self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
491 resp.command.success
492 })
493 .await
494 }
495
496 pub async fn unsubscribe(
497 &self,
498 consumer_id: u64,
499 ) -> Result<proto::CommandSuccess, ConnectionError> {
500 let request_id = self.request_id.get();
501 let msg = messages::unsubscribe(consumer_id, request_id);
502 self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
503 resp.command.success
504 })
505 .await
506 }
507
508 async fn send_message<R: Debug, F>(
509 &self,
510 msg: Message,
511 key: RequestKey,
512 extract: F,
513 ) -> Result<R, ConnectionError>
514 where
515 F: FnOnce(Message) -> Option<R>,
516 {
517 let (resolver, response) = oneshot::channel();
518 trace!("sending message(key = {:?}): {:?}", key, msg);
519
520 let k = key.clone();
521 let response = async {
522 response
523 .await
524 .map_err(|oneshot::Canceled| {
525 self.error.set(ConnectionError::Disconnected);
526 ConnectionError::Disconnected
527 })
528 .map(move |message: Message| {
529 trace!("received message(key = {:?}): {:?}", k, message);
530 extract_message(message, extract)
531 })?
532 };
533
534 match (
535 self.registrations
536 .unbounded_send(Register::Request { key, resolver }),
537 self.tx.unbounded_send(msg),
538 ) {
539 (Ok(_), Ok(_)) => {
540 let delay_f = self.executor.delay(self.operation_timeout);
541 pin_mut!(response);
542 pin_mut!(delay_f);
543
544 match select(response, delay_f).await {
545 Either::Left((res, _)) => res,
546 Either::Right(_) => Err(ConnectionError::Io(std::io::Error::new(
547 std::io::ErrorKind::TimedOut,
548 "timeout sending message to the Pulsar server",
549 ))),
550 }
551 }
552 _ => Err(ConnectionError::Disconnected),
553 }
554 }
555}
556
557pub struct Connection<Exe: Executor> {
558 id: i64,
559 url: Url,
560 sender: ConnectionSender<Exe>,
561}
562
563impl<Exe: Executor> Connection<Exe> {
564 pub async fn new(
565 url: Url,
566 auth_data: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
567 proxy_to_broker_url: Option<String>,
568 certificate_chain: &[Certificate],
569 allow_insecure_connection: bool,
570 tls_hostname_verification_enabled: bool,
571 connection_timeout: Duration,
572 operation_timeout: Duration,
573 executor: Arc<Exe>,
574 ) -> Result<Connection<Exe>, ConnectionError> {
575 if url.scheme() != "pulsar" && url.scheme() != "pulsar+ssl" {
576 error!("invalid scheme: {}", url.scheme());
577 return Err(ConnectionError::NotFound);
578 }
579 let hostname = url.host().map(|s| s.to_string());
580
581 let tls = match url.scheme() {
582 "pulsar" => false,
583 "pulsar+ssl" => true,
584 s => {
585 error!("invalid scheme: {}", s);
586 return Err(ConnectionError::NotFound);
587 }
588 };
589
590 let u = url.clone();
591 let address: SocketAddr = match executor
592 .spawn_blocking(move || {
593 u.socket_addrs(|| match u.scheme() {
594 "pulsar" => Some(6650),
595 "pulsar+ssl" => Some(6651),
596 _ => None,
597 })
598 .map_err(|e| {
599 error!("could not look up address: {:?}", e);
600 e
601 })
602 .ok()
603 .and_then(|v| {
604 let mut rng = thread_rng();
605 let index: usize = rng.gen_range(0..v.len());
606 v.get(index).copied()
607 })
608 })
609 .await
610 {
611 Some(Some(address)) => address,
612 _ =>
613 {
615 return Err(ConnectionError::NotFound)
616 }
617 };
618
619 let hostname = hostname.unwrap_or_else(|| address.ip().to_string());
620
621 debug!("Connecting to {}: {}", url, address);
622 let sender_prepare = Connection::prepare_stream(
623 address,
624 hostname,
625 tls,
626 auth_data,
627 proxy_to_broker_url,
628 certificate_chain,
629 allow_insecure_connection,
630 tls_hostname_verification_enabled,
631 executor.clone(),
632 operation_timeout,
633 );
634 let delay_f = executor.delay(connection_timeout);
635
636 pin_mut!(sender_prepare);
637 pin_mut!(delay_f);
638
639 let sender;
640 match select(sender_prepare, delay_f).await {
641 Either::Left((res, _)) => sender = res?,
642 Either::Right(_) => {
643 return Err(ConnectionError::Io(std::io::Error::new(
644 std::io::ErrorKind::TimedOut,
645 "timeout connecting to the Pulsar server",
646 )));
647 }
648 };
649
650 let id = rand::random();
651 Ok(Connection { id, url, sender })
652 }
653
654 async fn prepare_auth_data(auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>)
655 -> Result<Option<Authentication>, ConnectionError> {
656 match auth {
657 Some(m_auth) => {
658 let mut auth_guard = m_auth.lock().await;
659 Ok(Some(Authentication {
660 name: auth_guard.auth_method_name(),
661 data: auth_guard.auth_data().await?,
662 }))
663 }
664 None => Ok(None)
665 }
666 }
667
668 async fn prepare_stream(
669 address: SocketAddr,
670 hostname: String,
671 tls: bool,
672 auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
673 proxy_to_broker_url: Option<String>,
674 certificate_chain: &[Certificate],
675 allow_insecure_connection: bool,
676 tls_hostname_verification_enabled: bool,
677 executor: Arc<Exe>,
678 operation_timeout: Duration,
679 ) -> Result<ConnectionSender<Exe>, ConnectionError> {
680 match executor.kind() {
681 #[cfg(feature = "tokio-runtime")]
682 ExecutorKind::Tokio => {
683 if tls {
684 let stream = tokio::net::TcpStream::connect(&address).await?;
685
686 let mut builder = native_tls::TlsConnector::builder();
687 for certificate in certificate_chain {
688 builder.add_root_certificate(certificate.clone());
689 }
690 builder.danger_accept_invalid_hostnames(
691 allow_insecure_connection && !tls_hostname_verification_enabled,
692 );
693 builder.danger_accept_invalid_certs(allow_insecure_connection);
694 let cx = builder.build()?;
695 let cx = tokio_native_tls::TlsConnector::from(cx);
696 let stream = cx
697 .connect(&hostname, stream)
698 .await
699 .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?;
700
701 Connection::connect(
702 stream,
703 Self::prepare_auth_data(auth).await?,
704 proxy_to_broker_url,
705 executor,
706 operation_timeout,
707 )
708 .await
709 } else {
710 let stream = tokio::net::TcpStream::connect(&address)
711 .await
712 .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?;
713
714 Connection::connect(
715 stream,
716 Self::prepare_auth_data(auth).await?,
717 proxy_to_broker_url,
718 executor,
719 operation_timeout,
720 )
721 .await
722 }
723 }
724 #[cfg(not(feature = "tokio-runtime"))]
725 ExecutorKind::Tokio => {
726 unimplemented!("the tokio-runtime cargo feature is not active");
727 }
728 #[cfg(feature = "async-std-runtime")]
729 ExecutorKind::AsyncStd => {
730 if tls {
731 let stream = async_std::net::TcpStream::connect(&address).await?;
732 let mut connector = async_native_tls::TlsConnector::new();
733 for certificate in certificate_chain {
734 connector = connector.add_root_certificate(certificate.clone());
735 }
736 connector = connector.danger_accept_invalid_hostnames(
737 allow_insecure_connection && !tls_hostname_verification_enabled,
738 );
739 connector = connector.danger_accept_invalid_certs(allow_insecure_connection);
740 let stream = connector
741 .connect(&hostname, stream)
742 .await
743 .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;
744
745 Connection::connect(
746 stream,
747 Self::prepare_auth_data(auth).await?,
748 proxy_to_broker_url,
749 executor,
750 operation_timeout,
751 )
752 .await
753 } else {
754 let stream = async_std::net::TcpStream::connect(&address)
755 .await
756 .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;
757
758 Connection::connect(
759 stream,
760 Self::prepare_auth_data(auth).await?,
761 proxy_to_broker_url,
762 executor,
763 operation_timeout,
764 )
765 .await
766 }
767 }
768 #[cfg(not(feature = "async-std-runtime"))]
769 ExecutorKind::AsyncStd => {
770 unimplemented!("the async-std-runtime cargo feature is not active");
771 }
772 }
773 }
774
775 pub async fn connect<S>(
776 mut stream: S,
777 auth_data: Option<Authentication>,
778 proxy_to_broker_url: Option<String>,
779 executor: Arc<Exe>,
780 operation_timeout: Duration,
781 ) -> Result<ConnectionSender<Exe>, ConnectionError>
782 where
783 S: Stream<Item = Result<Message, ConnectionError>>,
784 S: Sink<Message, Error = ConnectionError>,
785 S: Send + std::marker::Unpin + 'static,
786 {
787 let _ = stream
788 .send({
789 let msg = messages::connect(auth_data, proxy_to_broker_url);
790 trace!("connection message: {:?}", msg);
791 msg
792 })
793 .await?;
794
795 let msg = stream.next().await;
796 match msg {
797 Some(Ok(Message {
798 command:
799 proto::BaseCommand {
800 error: Some(error), ..
801 },
802 ..
803 })) => Err(ConnectionError::PulsarError(
804 crate::error::server_error(error.error),
805 Some(error.message),
806 )),
807 Some(Ok(msg)) => {
808 let cmd = msg.command.clone();
809 trace!("received connection response: {:?}", msg);
810 msg.command.connected.ok_or_else(|| {
811 ConnectionError::Unexpected(format!(
812 "Unexpected message from pulsar: {:?}",
813 cmd
814 ))
815 })
816 }
817 Some(Err(e)) => Err(e),
818 None => Err(ConnectionError::Disconnected),
819 }?;
820
821 let (mut sink, stream) = stream.split();
822 let (tx, mut rx) = mpsc::unbounded();
823 let (registrations_tx, registrations_rx) = mpsc::unbounded();
824 let error = SharedError::new();
825 let (receiver_shutdown_tx, receiver_shutdown_rx) = oneshot::channel();
826
827 if executor
828 .spawn(Box::pin(
829 Receiver::new(
830 stream,
831 tx.clone(),
832 error.clone(),
833 registrations_rx,
834 receiver_shutdown_rx,
835 )
836 .map(|_| ()),
837 ))
838 .is_err()
839 {
840 error!("the executor could not spawn the Receiver future");
841 return Err(ConnectionError::Shutdown);
842 }
843
844 let err = error.clone();
845 let res = executor.spawn(Box::pin(async move {
846 while let Some(msg) = rx.next().await {
847 if let Err(e) = sink.send(msg).await {
848 err.set(e);
849 break;
850 }
851 }
852 }));
853 if res.is_err() {
854 error!("the executor could not spawn the Receiver future");
855 return Err(ConnectionError::Shutdown);
856 }
857
858 let sender = ConnectionSender::new(
859 tx,
860 registrations_tx,
861 receiver_shutdown_tx,
862 SerialId::new(),
863 error,
864 executor.clone(),
865 operation_timeout,
866 );
867
868 Ok(sender)
869 }
870
871 pub fn id(&self) -> i64 {
872 self.id
873 }
874
875 pub fn error(&self) -> Option<ConnectionError> {
876 self.sender.error.remove()
877 }
878
879 pub fn is_valid(&self) -> bool {
880 !self.sender.error.is_set()
881 }
882
883 pub fn url(&self) -> &Url {
884 &self.url
885 }
886
887 pub fn sender(&self) -> &ConnectionSender<Exe> {
889 &self.sender
890 }
891}
892
893impl<Exe: Executor> Drop for Connection<Exe> {
894 fn drop(&mut self) {
895 trace!("dropping connection {} for {}", self.id, self.url);
896 if let Some(shutdown) = self.sender.receiver_shutdown.take() {
897 let _ = shutdown.send(());
898 }
899 }
900}
901
902fn extract_message<T: Debug, F>(message: Message, extract: F) -> Result<T, ConnectionError>
903where
904 F: FnOnce(Message) -> Option<T>,
905{
906 if let Some(e) = message.command.error {
907 Err(ConnectionError::PulsarError(
908 crate::error::server_error(e.error),
909 Some(e.message),
910 ))
911 } else {
912 let cmd = message.command.clone();
913 if let Some(extracted) = extract(message) {
914 trace!("extracted message: {:?}", extracted);
915 Ok(extracted)
916 } else {
917 Err(ConnectionError::UnexpectedResponse(format!("{:?}", cmd)))
918 }
919 }
920}
921
922pub(crate) mod messages {
923 use chrono::Utc;
924 use proto::MessageIdData;
925
926 use crate::connection::Authentication;
927 use crate::consumer::ConsumerOptions;
928 use crate::message::{
929 proto::{self, base_command::Type as CommandType, command_subscribe::SubType},
930 Message, Payload,
931 };
932 use crate::producer::{self, ProducerOptions};
933
934 pub fn connect(auth: Option<Authentication>, proxy_to_broker_url: Option<String>) -> Message {
935 let (auth_method_name, auth_data) = match auth {
936 Some(auth) => (Some(auth.name), Some(auth.data)),
937 None => (None, None),
938 };
939
940 Message {
941 command: proto::BaseCommand {
942 r#type: CommandType::Connect as i32,
943 connect: Some(proto::CommandConnect {
944 auth_method_name,
945 auth_data,
946 proxy_to_broker_url,
947 client_version: String::from("2.0.1-incubating"),
948 protocol_version: Some(12),
949 ..Default::default()
950 }),
951 ..Default::default()
952 },
953 payload: None,
954 }
955 }
956
957 pub fn ping() -> Message {
958 Message {
959 command: proto::BaseCommand {
960 r#type: CommandType::Ping as i32,
961 ping: Some(proto::CommandPing {}),
962 ..Default::default()
963 },
964 payload: None,
965 }
966 }
967
968 pub fn pong() -> Message {
969 Message {
970 command: proto::BaseCommand {
971 r#type: CommandType::Pong as i32,
972 pong: Some(proto::CommandPong {}),
973 ..Default::default()
974 },
975 payload: None,
976 }
977 }
978
979 pub fn create_producer(
980 topic: String,
981 producer_name: Option<String>,
982 producer_id: u64,
983 request_id: u64,
984 options: ProducerOptions,
985 ) -> Message {
986 Message {
987 command: proto::BaseCommand {
988 r#type: CommandType::Producer as i32,
989 producer: Some(proto::CommandProducer {
990 topic,
991 producer_id,
992 request_id,
993 producer_name,
994 encrypted: options.encrypted,
995 metadata: options
996 .metadata
997 .iter()
998 .map(|(k, v)| proto::KeyValue {
999 key: k.clone(),
1000 value: v.clone(),
1001 })
1002 .collect(),
1003 schema: options.schema,
1004 ..Default::default()
1005 }),
1006 ..Default::default()
1007 },
1008 payload: None,
1009 }
1010 }
1011
1012 pub fn get_topics_of_namespace(
1013 request_id: u64,
1014 namespace: String,
1015 mode: proto::command_get_topics_of_namespace::Mode,
1016 ) -> Message {
1017 Message {
1018 command: proto::BaseCommand {
1019 r#type: CommandType::GetTopicsOfNamespace as i32,
1020 get_topics_of_namespace: Some(proto::CommandGetTopicsOfNamespace {
1021 request_id,
1022 namespace,
1023 mode: Some(mode as i32),
1024 }),
1025 ..Default::default()
1026 },
1027 payload: None,
1028 }
1029 }
1030
1031 pub(crate) fn send(
1032 producer_id: u64,
1033 producer_name: String,
1034 sequence_id: u64,
1035 message: producer::ProducerMessage,
1036 ) -> Message {
1037 let properties = message
1038 .properties
1039 .into_iter()
1040 .map(|(key, value)| proto::KeyValue { key, value })
1041 .collect();
1042
1043 Message {
1044 command: proto::BaseCommand {
1045 r#type: CommandType::Send as i32,
1046 send: Some(proto::CommandSend {
1047 producer_id,
1048 sequence_id,
1049 num_messages: message.num_messages_in_batch,
1050 ..Default::default()
1051 }),
1052 ..Default::default()
1053 },
1054 payload: Some(Payload {
1055 metadata: proto::MessageMetadata {
1056 producer_name,
1057 sequence_id,
1058 properties,
1059 publish_time: Utc::now().timestamp_millis() as u64,
1060 replicated_from: None,
1061 partition_key: message.partition_key,
1062 replicate_to: message.replicate_to,
1063 compression: message.compression,
1064 uncompressed_size: message.uncompressed_size,
1065 num_messages_in_batch: message.num_messages_in_batch,
1066 event_time: message.event_time,
1067 encryption_keys: message.encryption_keys,
1068 encryption_algo: message.encryption_algo,
1069 encryption_param: message.encryption_param,
1070 schema_version: message.schema_version,
1071 deliver_at_time: message.deliver_at_time,
1072 ..Default::default()
1073 },
1074 data: message.payload,
1075 }),
1076 }
1077 }
1078
1079 pub fn lookup_topic(topic: String, authoritative: bool, request_id: u64) -> Message {
1080 Message {
1081 command: proto::BaseCommand {
1082 r#type: CommandType::Lookup as i32,
1083 lookup_topic: Some(proto::CommandLookupTopic {
1084 topic,
1085 request_id,
1086 authoritative: Some(authoritative),
1087 ..Default::default()
1088 }),
1089 ..Default::default()
1090 },
1091 payload: None,
1092 }
1093 }
1094
1095 pub fn lookup_partitioned_topic(topic: String, request_id: u64) -> Message {
1096 Message {
1097 command: proto::BaseCommand {
1098 r#type: CommandType::PartitionedMetadata as i32,
1099 partition_metadata: Some(proto::CommandPartitionedTopicMetadata {
1100 topic,
1101 request_id,
1102 ..Default::default()
1103 }),
1104 ..Default::default()
1105 },
1106 payload: None,
1107 }
1108 }
1109
1110 pub fn close_producer(producer_id: u64, request_id: u64) -> Message {
1111 Message {
1112 command: proto::BaseCommand {
1113 r#type: CommandType::CloseProducer as i32,
1114 close_producer: Some(proto::CommandCloseProducer {
1115 producer_id,
1116 request_id,
1117 }),
1118 ..Default::default()
1119 },
1120 payload: None,
1121 }
1122 }
1123
1124 pub fn subscribe(
1125 topic: String,
1126 subscription: String,
1127 sub_type: SubType,
1128 consumer_id: u64,
1129 request_id: u64,
1130 consumer_name: Option<String>,
1131 options: ConsumerOptions,
1132 ) -> Message {
1133 Message {
1134 command: proto::BaseCommand {
1135 r#type: CommandType::Subscribe as i32,
1136 subscribe: Some(proto::CommandSubscribe {
1137 topic,
1138 subscription,
1139 sub_type: sub_type as i32,
1140 consumer_id,
1141 request_id,
1142 consumer_name,
1143 priority_level: options.priority_level,
1144 durable: options.durable,
1145 metadata: options
1146 .metadata
1147 .iter()
1148 .map(|(k, v)| proto::KeyValue {
1149 key: k.clone(),
1150 value: v.clone(),
1151 })
1152 .collect(),
1153 read_compacted: Some(options.read_compacted.unwrap_or(false)),
1154 initial_position: Some(options.initial_position.into()),
1155 schema: options.schema,
1156 start_message_id: options.start_message_id,
1157 ..Default::default()
1158 }),
1159 ..Default::default()
1160 },
1161 payload: None,
1162 }
1163 }
1164
1165 pub fn flow(consumer_id: u64, message_permits: u32) -> Message {
1166 Message {
1167 command: proto::BaseCommand {
1168 r#type: CommandType::Flow as i32,
1169 flow: Some(proto::CommandFlow {
1170 consumer_id,
1171 message_permits,
1172 }),
1173 ..Default::default()
1174 },
1175 payload: None,
1176 }
1177 }
1178
1179 pub fn ack(
1180 consumer_id: u64,
1181 message_id: Vec<proto::MessageIdData>,
1182 cumulative: bool,
1183 ) -> Message {
1184 Message {
1185 command: proto::BaseCommand {
1186 r#type: CommandType::Ack as i32,
1187 ack: Some(proto::CommandAck {
1188 consumer_id,
1189 ack_type: if cumulative {
1190 proto::command_ack::AckType::Cumulative as i32
1191 } else {
1192 proto::command_ack::AckType::Individual as i32
1193 },
1194 message_id,
1195 validation_error: None,
1196 properties: Vec::new(),
1197 ..Default::default()
1198 }),
1199 ..Default::default()
1200 },
1201 payload: None,
1202 }
1203 }
1204
1205 pub fn redeliver_unacknowleged_messages(
1206 consumer_id: u64,
1207 message_ids: Vec<proto::MessageIdData>,
1208 ) -> Message {
1209 Message {
1210 command: proto::BaseCommand {
1211 r#type: CommandType::RedeliverUnacknowledgedMessages as i32,
1212 redeliver_unacknowledged_messages: Some(
1213 proto::CommandRedeliverUnacknowledgedMessages {
1214 consumer_id,
1215 message_ids,
1216 },
1217 ),
1218 ..Default::default()
1219 },
1220 payload: None,
1221 }
1222 }
1223
1224 pub fn close_consumer(consumer_id: u64, request_id: u64) -> Message {
1225 Message {
1226 command: proto::BaseCommand {
1227 r#type: CommandType::CloseConsumer as i32,
1228 close_consumer: Some(proto::CommandCloseConsumer {
1229 consumer_id,
1230 request_id,
1231 }),
1232 ..Default::default()
1233 },
1234 payload: None,
1235 }
1236 }
1237
1238 pub fn seek(
1239 consumer_id: u64,
1240 request_id: u64,
1241 message_id: Option<MessageIdData>,
1242 message_publish_time: Option<u64>,
1243 ) -> Message {
1244 Message {
1245 command: proto::BaseCommand {
1246 r#type: CommandType::Seek as i32,
1247 seek: Some(proto::CommandSeek {
1248 consumer_id,
1249 request_id,
1250 message_id,
1251 message_publish_time,
1252 }),
1253 ..Default::default()
1254 },
1255 payload: None,
1256 }
1257 }
1258
1259 pub fn unsubscribe(consumer_id: u64, request_id: u64) -> Message {
1260 Message {
1261 command: proto::BaseCommand {
1262 r#type: CommandType::Unsubscribe as i32,
1263 unsubscribe: Some(proto::CommandUnsubscribe {
1264 consumer_id,
1265 request_id,
1266 }),
1267 ..Default::default()
1268 },
1269 payload: None,
1270 }
1271 }
1272}