pulsar/
connection.rs

1use native_tls::Certificate;
2use proto::MessageIdData;
3use rand::{thread_rng, Rng};
4use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::sync::{
9    atomic::{AtomicUsize, Ordering},
10    Arc,
11};
12use std::time::Duration;
13
14use futures::{
15    self,
16    channel::{mpsc, oneshot},
17    future::{select, Either},
18    pin_mut,
19    task::{Context, Poll},
20    Future, FutureExt, Sink, SinkExt, Stream, StreamExt,
21};
22use url::Url;
23
24use crate::consumer::ConsumerOptions;
25use crate::error::{ConnectionError, SharedError, AuthenticationError};
26use crate::executor::{Executor, ExecutorKind};
27use crate::message::{
28    proto::{self, command_subscribe::SubType},
29    BaseCommand, Codec, Message,
30};
31use crate::producer::{self, ProducerOptions};
32use async_trait::async_trait;
33use futures::lock::Mutex;
34
35pub(crate) enum Register {
36    Request {
37        key: RequestKey,
38        resolver: oneshot::Sender<Message>,
39    },
40    Consumer {
41        consumer_id: u64,
42        resolver: mpsc::UnboundedSender<Message>,
43    },
44    Ping {
45        resolver: oneshot::Sender<()>,
46    },
47}
48
49/// identifier for a message
50#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq)]
51pub enum RequestKey {
52    RequestId(u64),
53    ProducerSend { producer_id: u64, sequence_id: u64 },
54    Consumer { consumer_id: u64 },
55    CloseConsumer { consumer_id: u64, request_id: u64 },
56}
57
58/// Authentication parameters
59#[derive(Clone)]
60pub struct Authentication {
61    /// Authentication kid. Use "token" for JWT
62    pub name: String,
63    /// Authentication data
64    pub data: Vec<u8>,
65}
66
67#[async_trait]
68impl crate::authentication::Authentication for Authentication {
69    fn auth_method_name(&self) -> String {
70        self.name.clone()
71    }
72
73    async fn initialize(&mut self) -> Result<(), AuthenticationError> {
74        Ok(())
75    }
76
77    async fn auth_data(&mut self) -> Result<Vec<u8>, AuthenticationError> {
78        Ok(self.data.clone())
79    }
80}
81
82pub(crate) struct Receiver<S: Stream<Item = Result<Message, ConnectionError>>> {
83    inbound: Pin<Box<S>>,
84    outbound: mpsc::UnboundedSender<Message>,
85    error: SharedError,
86    pending_requests: BTreeMap<RequestKey, oneshot::Sender<Message>>,
87    consumers: BTreeMap<u64, mpsc::UnboundedSender<Message>>,
88    received_messages: BTreeMap<RequestKey, Message>,
89    registrations: Pin<Box<mpsc::UnboundedReceiver<Register>>>,
90    shutdown: Pin<Box<oneshot::Receiver<()>>>,
91    ping: Option<oneshot::Sender<()>>,
92}
93
94impl<S: Stream<Item = Result<Message, ConnectionError>>> Receiver<S> {
95    pub fn new(
96        inbound: S,
97        outbound: mpsc::UnboundedSender<Message>,
98        error: SharedError,
99        registrations: mpsc::UnboundedReceiver<Register>,
100        shutdown: oneshot::Receiver<()>,
101    ) -> Receiver<S> {
102        Receiver {
103            inbound: Box::pin(inbound),
104            outbound,
105            error,
106            pending_requests: BTreeMap::new(),
107            received_messages: BTreeMap::new(),
108            consumers: BTreeMap::new(),
109            registrations: Box::pin(registrations),
110            shutdown: Box::pin(shutdown),
111            ping: None,
112        }
113    }
114}
115
116impl<S: Stream<Item = Result<Message, ConnectionError>>> Future for Receiver<S> {
117    type Output = Result<(), ()>;
118
119    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
120        match self.shutdown.as_mut().poll(cx) {
121            Poll::Ready(Ok(())) | Poll::Ready(Err(futures::channel::oneshot::Canceled)) => {
122                return Poll::Ready(Err(()))
123            }
124            Poll::Pending => {}
125        }
126
127        //Are we worried about starvation here?
128        loop {
129            match self.registrations.as_mut().poll_next(cx) {
130                Poll::Ready(Some(Register::Request { key, resolver })) => {
131                    match self.received_messages.remove(&key) {
132                        Some(msg) => {
133                            let _ = resolver.send(msg);
134                        }
135                        None => {
136                            self.pending_requests.insert(key, resolver);
137                        }
138                    }
139                }
140                Poll::Ready(Some(Register::Consumer {
141                    consumer_id,
142                    resolver,
143                })) => {
144                    self.consumers.insert(consumer_id, resolver);
145                }
146                Poll::Ready(Some(Register::Ping { resolver })) => {
147                    self.ping = Some(resolver);
148                }
149                Poll::Ready(None) => {
150                    self.error.set(ConnectionError::Disconnected);
151                    return Poll::Ready(Err(()));
152                }
153                Poll::Pending => break,
154            }
155        }
156
157        loop {
158            match self.inbound.as_mut().poll_next(cx) {
159                Poll::Ready(Some(Ok(msg))) => match msg {
160                    Message {
161                        command: BaseCommand { ping: Some(_), .. },
162                        ..
163                    } => {
164                        let _ = self.outbound.unbounded_send(messages::pong());
165                    }
166                    Message {
167                        command: BaseCommand { pong: Some(_), .. },
168                        ..
169                    } => {
170                        if let Some(sender) = self.ping.take() {
171                            let _ = sender.send(());
172                        }
173                    }
174                    msg => match msg.request_key() {
175                        Some(key @ RequestKey::RequestId(_))
176                        | Some(key @ RequestKey::ProducerSend { .. }) => {
177                            if let Some(resolver) = self.pending_requests.remove(&key) {
178                                // We don't care if the receiver has dropped their future
179                                let _ = resolver.send(msg);
180                            } else {
181                                self.received_messages.insert(key, msg);
182                            }
183                        }
184                        Some(RequestKey::Consumer { consumer_id }) => {
185                            let _ = self
186                                .consumers
187                                .get_mut(&consumer_id)
188                                .map(move |consumer| consumer.unbounded_send(msg));
189                        }
190                        Some(RequestKey::CloseConsumer {
191                            consumer_id,
192                            request_id,
193                        }) => {
194                            // FIXME: could the registration still be in queue while we get the
195                            // CloseConsumer message?
196                            if let Some(resolver) = self
197                                .pending_requests
198                                .remove(&RequestKey::RequestId(request_id))
199                            {
200                                // We don't care if the receiver has dropped their future
201                                let _ = resolver.send(msg);
202                            } else {
203                                let res = self
204                                    .consumers
205                                    .get_mut(&consumer_id)
206                                    .map(move |consumer| consumer.unbounded_send(msg));
207
208                                if !res.as_ref().map(|r| r.is_ok()).unwrap_or(false) {
209                                    error!("ConnectionReceiver: error transmitting message to consumer: {:?}", res);
210                                }
211                            }
212                        }
213                        None => {
214                            warn!(
215                                "Received unexpected message; dropping. Message {:?}",
216                                msg.command
217                            )
218                        }
219                    },
220                },
221                Poll::Ready(None) => {
222                    self.error.set(ConnectionError::Disconnected);
223                    return Poll::Ready(Err(()));
224                }
225                Poll::Pending => return Poll::Pending,
226                Poll::Ready(Some(Err(e))) => {
227                    self.error.set(e);
228                    return Poll::Ready(Err(()));
229                }
230            }
231        }
232    }
233}
234
235#[derive(Clone)]
236pub struct SerialId(Arc<AtomicUsize>);
237
238impl Default for SerialId {
239    fn default() -> Self {
240        SerialId(Arc::new(AtomicUsize::new(0)))
241    }
242}
243
244impl SerialId {
245    pub fn new() -> Self {
246        Self::default()
247    }
248    pub fn get(&self) -> u64 {
249        self.0.fetch_add(1, Ordering::Relaxed) as u64
250    }
251}
252
253/// An owned type that can send messages like a connection
254//#[derive(Clone)]
255pub struct ConnectionSender<Exe: Executor> {
256    tx: mpsc::UnboundedSender<Message>,
257    registrations: mpsc::UnboundedSender<Register>,
258    receiver_shutdown: Option<oneshot::Sender<()>>,
259    request_id: SerialId,
260    error: SharedError,
261    executor: Arc<Exe>,
262    operation_timeout: Duration,
263}
264
265impl<Exe: Executor> ConnectionSender<Exe> {
266    pub(crate) fn new(
267        tx: mpsc::UnboundedSender<Message>,
268        registrations: mpsc::UnboundedSender<Register>,
269        receiver_shutdown: oneshot::Sender<()>,
270        request_id: SerialId,
271        error: SharedError,
272        executor: Arc<Exe>,
273        operation_timeout: Duration,
274    ) -> ConnectionSender<Exe> {
275        ConnectionSender {
276            tx,
277            registrations,
278            receiver_shutdown: Some(receiver_shutdown),
279            request_id,
280            error,
281            executor,
282            operation_timeout,
283        }
284    }
285
286    pub(crate) async fn send(
287        &self,
288        producer_id: u64,
289        producer_name: String,
290        sequence_id: u64,
291        message: producer::ProducerMessage,
292    ) -> Result<proto::CommandSendReceipt, ConnectionError> {
293        let key = RequestKey::ProducerSend {
294            producer_id,
295            sequence_id,
296        };
297        let msg = messages::send(producer_id, producer_name, sequence_id, message);
298        self.send_message(msg, key, |resp| resp.command.send_receipt)
299            .await
300    }
301
302    pub async fn send_ping(&self) -> Result<(), ConnectionError> {
303        let (resolver, response) = oneshot::channel();
304        trace!("sending ping");
305
306        match (
307            self.registrations
308                .unbounded_send(Register::Ping { resolver }),
309            self.tx.unbounded_send(messages::ping()),
310        ) {
311            (Ok(_), Ok(_)) => {
312                let delay_f = self.executor.delay(self.operation_timeout);
313                pin_mut!(response);
314                pin_mut!(delay_f);
315
316                match select(response, delay_f).await {
317                    Either::Left((res, _)) => res
318                        .map_err(|oneshot::Canceled| {
319                            self.error.set(ConnectionError::Disconnected);
320                            ConnectionError::Disconnected
321                        })
322                        .map(move |_| trace!("received pong")),
323                    Either::Right(_) => {
324                        self.error.set(ConnectionError::Io(std::io::Error::new(
325                            std::io::ErrorKind::TimedOut,
326                            "timeout when sending ping to the Pulsar server",
327                        )));
328                        Err(ConnectionError::Io(std::io::Error::new(
329                            std::io::ErrorKind::TimedOut,
330                            "timeout when sending ping to the Pulsar server",
331                        )))
332                    }
333                }
334            }
335            _ => Err(ConnectionError::Disconnected),
336        }
337    }
338
339    pub async fn lookup_topic<S: Into<String>>(
340        &self,
341        topic: S,
342        authoritative: bool,
343    ) -> Result<proto::CommandLookupTopicResponse, ConnectionError> {
344        let request_id = self.request_id.get();
345        let msg = messages::lookup_topic(topic.into(), authoritative, request_id);
346        self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
347            resp.command.lookup_topic_response
348        })
349        .await
350    }
351
352    pub async fn lookup_partitioned_topic<S: Into<String>>(
353        &self,
354        topic: S,
355    ) -> Result<proto::CommandPartitionedTopicMetadataResponse, ConnectionError> {
356        let request_id = self.request_id.get();
357        let msg = messages::lookup_partitioned_topic(topic.into(), request_id);
358        self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
359            resp.command.partition_metadata_response
360        })
361        .await
362    }
363
364    pub async fn create_producer(
365        &self,
366        topic: String,
367        producer_id: u64,
368        producer_name: Option<String>,
369        options: ProducerOptions,
370    ) -> Result<proto::CommandProducerSuccess, ConnectionError> {
371        let request_id = self.request_id.get();
372        let msg = messages::create_producer(topic, producer_name, producer_id, request_id, options);
373        self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
374            resp.command.producer_success
375        })
376        .await
377    }
378
379    pub async fn get_topics_of_namespace(
380        &self,
381        namespace: String,
382        mode: proto::command_get_topics_of_namespace::Mode,
383    ) -> Result<proto::CommandGetTopicsOfNamespaceResponse, ConnectionError> {
384        let request_id = self.request_id.get();
385        let msg = messages::get_topics_of_namespace(request_id, namespace, mode);
386        self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
387            resp.command.get_topics_of_namespace_response
388        })
389        .await
390    }
391
392    pub async fn close_producer(
393        &self,
394        producer_id: u64,
395    ) -> Result<proto::CommandSuccess, ConnectionError> {
396        let request_id = self.request_id.get();
397        let msg = messages::close_producer(producer_id, request_id);
398        self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
399            resp.command.success
400        })
401        .await
402    }
403
404    pub async fn subscribe(
405        &self,
406        resolver: mpsc::UnboundedSender<Message>,
407        topic: String,
408        subscription: String,
409        sub_type: SubType,
410        consumer_id: u64,
411        consumer_name: Option<String>,
412        options: ConsumerOptions,
413    ) -> Result<proto::CommandSuccess, ConnectionError> {
414        let request_id = self.request_id.get();
415        let msg = messages::subscribe(
416            topic,
417            subscription,
418            sub_type,
419            consumer_id,
420            request_id,
421            consumer_name,
422            options,
423        );
424        match self.registrations.unbounded_send(Register::Consumer {
425            consumer_id,
426            resolver,
427        }) {
428            Ok(_) => {}
429            Err(_) => {
430                self.error.set(ConnectionError::Disconnected);
431                return Err(ConnectionError::Disconnected);
432            }
433        }
434        self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
435            resp.command.success
436        })
437        .await
438    }
439
440    pub fn send_flow(&self, consumer_id: u64, message_permits: u32) -> Result<(), ConnectionError> {
441        self.tx
442            .unbounded_send(messages::flow(consumer_id, message_permits))
443            .map_err(|_| ConnectionError::Disconnected)
444    }
445
446    pub fn send_ack(
447        &self,
448        consumer_id: u64,
449        message_ids: Vec<proto::MessageIdData>,
450        cumulative: bool,
451    ) -> Result<(), ConnectionError> {
452        self.tx
453            .unbounded_send(messages::ack(consumer_id, message_ids, cumulative))
454            .map_err(|_| ConnectionError::Disconnected)
455    }
456
457    pub fn send_redeliver_unacknowleged_messages(
458        &self,
459        consumer_id: u64,
460        message_ids: Vec<proto::MessageIdData>,
461    ) -> Result<(), ConnectionError> {
462        self.tx
463            .unbounded_send(messages::redeliver_unacknowleged_messages(
464                consumer_id,
465                message_ids,
466            ))
467            .map_err(|_| ConnectionError::Disconnected)
468    }
469
470    pub async fn close_consumer(
471        &self,
472        consumer_id: u64,
473    ) -> Result<proto::CommandSuccess, ConnectionError> {
474        let request_id = self.request_id.get();
475        let msg = messages::close_consumer(consumer_id, request_id);
476        self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
477            resp.command.success
478        })
479        .await
480    }
481
482    pub async fn seek(
483        &self,
484        consumer_id: u64,
485        message_id: Option<MessageIdData>,
486        timestamp: Option<u64>,
487    ) -> Result<proto::CommandSuccess, ConnectionError> {
488        let request_id = self.request_id.get();
489        let msg = messages::seek(consumer_id, request_id, message_id, timestamp);
490        self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
491            resp.command.success
492        })
493        .await
494    }
495
496    pub async fn unsubscribe(
497        &self,
498        consumer_id: u64,
499    ) -> Result<proto::CommandSuccess, ConnectionError> {
500        let request_id = self.request_id.get();
501        let msg = messages::unsubscribe(consumer_id, request_id);
502        self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
503            resp.command.success
504        })
505        .await
506    }
507
508    async fn send_message<R: Debug, F>(
509        &self,
510        msg: Message,
511        key: RequestKey,
512        extract: F,
513    ) -> Result<R, ConnectionError>
514    where
515        F: FnOnce(Message) -> Option<R>,
516    {
517        let (resolver, response) = oneshot::channel();
518        trace!("sending message(key = {:?}): {:?}", key, msg);
519
520        let k = key.clone();
521        let response = async {
522            response
523                .await
524                .map_err(|oneshot::Canceled| {
525                    self.error.set(ConnectionError::Disconnected);
526                    ConnectionError::Disconnected
527                })
528                .map(move |message: Message| {
529                    trace!("received message(key = {:?}): {:?}", k, message);
530                    extract_message(message, extract)
531                })?
532        };
533
534        match (
535            self.registrations
536                .unbounded_send(Register::Request { key, resolver }),
537            self.tx.unbounded_send(msg),
538        ) {
539            (Ok(_), Ok(_)) => {
540                let delay_f = self.executor.delay(self.operation_timeout);
541                pin_mut!(response);
542                pin_mut!(delay_f);
543
544                match select(response, delay_f).await {
545                    Either::Left((res, _)) => res,
546                    Either::Right(_) => Err(ConnectionError::Io(std::io::Error::new(
547                        std::io::ErrorKind::TimedOut,
548                        "timeout sending message to the Pulsar server",
549                    ))),
550                }
551            }
552            _ => Err(ConnectionError::Disconnected),
553        }
554    }
555}
556
557pub struct Connection<Exe: Executor> {
558    id: i64,
559    url: Url,
560    sender: ConnectionSender<Exe>,
561}
562
563impl<Exe: Executor> Connection<Exe> {
564    pub async fn new(
565        url: Url,
566        auth_data: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
567        proxy_to_broker_url: Option<String>,
568        certificate_chain: &[Certificate],
569        allow_insecure_connection: bool,
570        tls_hostname_verification_enabled: bool,
571        connection_timeout: Duration,
572        operation_timeout: Duration,
573        executor: Arc<Exe>,
574    ) -> Result<Connection<Exe>, ConnectionError> {
575        if url.scheme() != "pulsar" && url.scheme() != "pulsar+ssl" {
576            error!("invalid scheme: {}", url.scheme());
577            return Err(ConnectionError::NotFound);
578        }
579        let hostname = url.host().map(|s| s.to_string());
580
581        let tls = match url.scheme() {
582            "pulsar" => false,
583            "pulsar+ssl" => true,
584            s => {
585                error!("invalid scheme: {}", s);
586                return Err(ConnectionError::NotFound);
587            }
588        };
589
590        let u = url.clone();
591        let address: SocketAddr = match executor
592            .spawn_blocking(move || {
593                u.socket_addrs(|| match u.scheme() {
594                    "pulsar" => Some(6650),
595                    "pulsar+ssl" => Some(6651),
596                    _ => None,
597                })
598                .map_err(|e| {
599                    error!("could not look up address: {:?}", e);
600                    e
601                })
602                .ok()
603                .and_then(|v| {
604                    let mut rng = thread_rng();
605                    let index: usize = rng.gen_range(0..v.len());
606                    v.get(index).copied()
607                })
608            })
609            .await
610        {
611            Some(Some(address)) => address,
612            _ =>
613            //return Err(Error::Custom(format!("could not query address: {}", url))),
614            {
615                return Err(ConnectionError::NotFound)
616            }
617        };
618
619        let hostname = hostname.unwrap_or_else(|| address.ip().to_string());
620
621        debug!("Connecting to {}: {}", url, address);
622        let sender_prepare = Connection::prepare_stream(
623            address,
624            hostname,
625            tls,
626            auth_data,
627            proxy_to_broker_url,
628            certificate_chain,
629            allow_insecure_connection,
630            tls_hostname_verification_enabled,
631            executor.clone(),
632            operation_timeout,
633        );
634        let delay_f = executor.delay(connection_timeout);
635
636        pin_mut!(sender_prepare);
637        pin_mut!(delay_f);
638
639        let sender;
640        match select(sender_prepare, delay_f).await {
641            Either::Left((res, _)) => sender = res?,
642            Either::Right(_) => {
643                return Err(ConnectionError::Io(std::io::Error::new(
644                    std::io::ErrorKind::TimedOut,
645                    "timeout connecting to the Pulsar server",
646                )));
647            }
648        };
649
650        let id = rand::random();
651        Ok(Connection { id, url, sender })
652    }
653
654    async fn prepare_auth_data(auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>)
655        -> Result<Option<Authentication>, ConnectionError> {
656        match auth {
657            Some(m_auth) => {
658                let mut auth_guard = m_auth.lock().await;
659                Ok(Some(Authentication {
660                    name: auth_guard.auth_method_name(),
661                    data: auth_guard.auth_data().await?,
662                }))
663            }
664            None => Ok(None)
665        }
666    }
667
668    async fn prepare_stream(
669        address: SocketAddr,
670        hostname: String,
671        tls: bool,
672        auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
673        proxy_to_broker_url: Option<String>,
674        certificate_chain: &[Certificate],
675        allow_insecure_connection: bool,
676        tls_hostname_verification_enabled: bool,
677        executor: Arc<Exe>,
678        operation_timeout: Duration,
679    ) -> Result<ConnectionSender<Exe>, ConnectionError> {
680        match executor.kind() {
681            #[cfg(feature = "tokio-runtime")]
682            ExecutorKind::Tokio => {
683                if tls {
684                    let stream = tokio::net::TcpStream::connect(&address).await?;
685
686                    let mut builder = native_tls::TlsConnector::builder();
687                    for certificate in certificate_chain {
688                        builder.add_root_certificate(certificate.clone());
689                    }
690                    builder.danger_accept_invalid_hostnames(
691                        allow_insecure_connection && !tls_hostname_verification_enabled,
692                    );
693                    builder.danger_accept_invalid_certs(allow_insecure_connection);
694                    let cx = builder.build()?;
695                    let cx = tokio_native_tls::TlsConnector::from(cx);
696                    let stream = cx
697                        .connect(&hostname, stream)
698                        .await
699                        .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?;
700
701                    Connection::connect(
702                        stream,
703                        Self::prepare_auth_data(auth).await?,
704                        proxy_to_broker_url,
705                        executor,
706                        operation_timeout,
707                    )
708                    .await
709                } else {
710                    let stream = tokio::net::TcpStream::connect(&address)
711                        .await
712                        .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?;
713
714                    Connection::connect(
715                        stream,
716                        Self::prepare_auth_data(auth).await?,
717                        proxy_to_broker_url,
718                        executor,
719                        operation_timeout,
720                    )
721                    .await
722                }
723            }
724            #[cfg(not(feature = "tokio-runtime"))]
725            ExecutorKind::Tokio => {
726                unimplemented!("the tokio-runtime cargo feature is not active");
727            }
728            #[cfg(feature = "async-std-runtime")]
729            ExecutorKind::AsyncStd => {
730                if tls {
731                    let stream = async_std::net::TcpStream::connect(&address).await?;
732                    let mut connector = async_native_tls::TlsConnector::new();
733                    for certificate in certificate_chain {
734                        connector = connector.add_root_certificate(certificate.clone());
735                    }
736                    connector = connector.danger_accept_invalid_hostnames(
737                        allow_insecure_connection && !tls_hostname_verification_enabled,
738                    );
739                    connector = connector.danger_accept_invalid_certs(allow_insecure_connection);
740                    let stream = connector
741                        .connect(&hostname, stream)
742                        .await
743                        .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;
744
745                    Connection::connect(
746                        stream,
747                        Self::prepare_auth_data(auth).await?,
748                        proxy_to_broker_url,
749                        executor,
750                        operation_timeout,
751                    )
752                    .await
753                } else {
754                    let stream = async_std::net::TcpStream::connect(&address)
755                        .await
756                        .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;
757
758                    Connection::connect(
759                        stream,
760                        Self::prepare_auth_data(auth).await?,
761                        proxy_to_broker_url,
762                        executor,
763                        operation_timeout,
764                    )
765                    .await
766                }
767            }
768            #[cfg(not(feature = "async-std-runtime"))]
769            ExecutorKind::AsyncStd => {
770                unimplemented!("the async-std-runtime cargo feature is not active");
771            }
772        }
773    }
774
775    pub async fn connect<S>(
776        mut stream: S,
777        auth_data: Option<Authentication>,
778        proxy_to_broker_url: Option<String>,
779        executor: Arc<Exe>,
780        operation_timeout: Duration,
781    ) -> Result<ConnectionSender<Exe>, ConnectionError>
782    where
783        S: Stream<Item = Result<Message, ConnectionError>>,
784        S: Sink<Message, Error = ConnectionError>,
785        S: Send + std::marker::Unpin + 'static,
786    {
787        let _ = stream
788            .send({
789                let msg = messages::connect(auth_data, proxy_to_broker_url);
790                trace!("connection message: {:?}", msg);
791                msg
792            })
793            .await?;
794
795        let msg = stream.next().await;
796        match msg {
797            Some(Ok(Message {
798                command:
799                    proto::BaseCommand {
800                        error: Some(error), ..
801                    },
802                ..
803            })) => Err(ConnectionError::PulsarError(
804                crate::error::server_error(error.error),
805                Some(error.message),
806            )),
807            Some(Ok(msg)) => {
808                let cmd = msg.command.clone();
809                trace!("received connection response: {:?}", msg);
810                msg.command.connected.ok_or_else(|| {
811                    ConnectionError::Unexpected(format!(
812                        "Unexpected message from pulsar: {:?}",
813                        cmd
814                    ))
815                })
816            }
817            Some(Err(e)) => Err(e),
818            None => Err(ConnectionError::Disconnected),
819        }?;
820
821        let (mut sink, stream) = stream.split();
822        let (tx, mut rx) = mpsc::unbounded();
823        let (registrations_tx, registrations_rx) = mpsc::unbounded();
824        let error = SharedError::new();
825        let (receiver_shutdown_tx, receiver_shutdown_rx) = oneshot::channel();
826
827        if executor
828            .spawn(Box::pin(
829                Receiver::new(
830                    stream,
831                    tx.clone(),
832                    error.clone(),
833                    registrations_rx,
834                    receiver_shutdown_rx,
835                )
836                .map(|_| ()),
837            ))
838            .is_err()
839        {
840            error!("the executor could not spawn the Receiver future");
841            return Err(ConnectionError::Shutdown);
842        }
843
844        let err = error.clone();
845        let res = executor.spawn(Box::pin(async move {
846            while let Some(msg) = rx.next().await {
847                if let Err(e) = sink.send(msg).await {
848                    err.set(e);
849                    break;
850                }
851            }
852        }));
853        if res.is_err() {
854            error!("the executor could not spawn the Receiver future");
855            return Err(ConnectionError::Shutdown);
856        }
857
858        let sender = ConnectionSender::new(
859            tx,
860            registrations_tx,
861            receiver_shutdown_tx,
862            SerialId::new(),
863            error,
864            executor.clone(),
865            operation_timeout,
866        );
867
868        Ok(sender)
869    }
870
871    pub fn id(&self) -> i64 {
872        self.id
873    }
874
875    pub fn error(&self) -> Option<ConnectionError> {
876        self.sender.error.remove()
877    }
878
879    pub fn is_valid(&self) -> bool {
880        !self.sender.error.is_set()
881    }
882
883    pub fn url(&self) -> &Url {
884        &self.url
885    }
886
887    /// Chain to send a message, e.g. conn.sender().send_ping()
888    pub fn sender(&self) -> &ConnectionSender<Exe> {
889        &self.sender
890    }
891}
892
893impl<Exe: Executor> Drop for Connection<Exe> {
894    fn drop(&mut self) {
895        trace!("dropping connection {} for {}", self.id, self.url);
896        if let Some(shutdown) = self.sender.receiver_shutdown.take() {
897            let _ = shutdown.send(());
898        }
899    }
900}
901
902fn extract_message<T: Debug, F>(message: Message, extract: F) -> Result<T, ConnectionError>
903where
904    F: FnOnce(Message) -> Option<T>,
905{
906    if let Some(e) = message.command.error {
907        Err(ConnectionError::PulsarError(
908            crate::error::server_error(e.error),
909            Some(e.message),
910        ))
911    } else {
912        let cmd = message.command.clone();
913        if let Some(extracted) = extract(message) {
914            trace!("extracted message: {:?}", extracted);
915            Ok(extracted)
916        } else {
917            Err(ConnectionError::UnexpectedResponse(format!("{:?}", cmd)))
918        }
919    }
920}
921
922pub(crate) mod messages {
923    use chrono::Utc;
924    use proto::MessageIdData;
925
926    use crate::connection::Authentication;
927    use crate::consumer::ConsumerOptions;
928    use crate::message::{
929        proto::{self, base_command::Type as CommandType, command_subscribe::SubType},
930        Message, Payload,
931    };
932    use crate::producer::{self, ProducerOptions};
933
934    pub fn connect(auth: Option<Authentication>, proxy_to_broker_url: Option<String>) -> Message {
935        let (auth_method_name, auth_data) = match auth {
936            Some(auth) => (Some(auth.name), Some(auth.data)),
937            None => (None, None),
938        };
939
940        Message {
941            command: proto::BaseCommand {
942                r#type: CommandType::Connect as i32,
943                connect: Some(proto::CommandConnect {
944                    auth_method_name,
945                    auth_data,
946                    proxy_to_broker_url,
947                    client_version: String::from("2.0.1-incubating"),
948                    protocol_version: Some(12),
949                    ..Default::default()
950                }),
951                ..Default::default()
952            },
953            payload: None,
954        }
955    }
956
957    pub fn ping() -> Message {
958        Message {
959            command: proto::BaseCommand {
960                r#type: CommandType::Ping as i32,
961                ping: Some(proto::CommandPing {}),
962                ..Default::default()
963            },
964            payload: None,
965        }
966    }
967
968    pub fn pong() -> Message {
969        Message {
970            command: proto::BaseCommand {
971                r#type: CommandType::Pong as i32,
972                pong: Some(proto::CommandPong {}),
973                ..Default::default()
974            },
975            payload: None,
976        }
977    }
978
979    pub fn create_producer(
980        topic: String,
981        producer_name: Option<String>,
982        producer_id: u64,
983        request_id: u64,
984        options: ProducerOptions,
985    ) -> Message {
986        Message {
987            command: proto::BaseCommand {
988                r#type: CommandType::Producer as i32,
989                producer: Some(proto::CommandProducer {
990                    topic,
991                    producer_id,
992                    request_id,
993                    producer_name,
994                    encrypted: options.encrypted,
995                    metadata: options
996                        .metadata
997                        .iter()
998                        .map(|(k, v)| proto::KeyValue {
999                            key: k.clone(),
1000                            value: v.clone(),
1001                        })
1002                        .collect(),
1003                    schema: options.schema,
1004                    ..Default::default()
1005                }),
1006                ..Default::default()
1007            },
1008            payload: None,
1009        }
1010    }
1011
1012    pub fn get_topics_of_namespace(
1013        request_id: u64,
1014        namespace: String,
1015        mode: proto::command_get_topics_of_namespace::Mode,
1016    ) -> Message {
1017        Message {
1018            command: proto::BaseCommand {
1019                r#type: CommandType::GetTopicsOfNamespace as i32,
1020                get_topics_of_namespace: Some(proto::CommandGetTopicsOfNamespace {
1021                    request_id,
1022                    namespace,
1023                    mode: Some(mode as i32),
1024                }),
1025                ..Default::default()
1026            },
1027            payload: None,
1028        }
1029    }
1030
1031    pub(crate) fn send(
1032        producer_id: u64,
1033        producer_name: String,
1034        sequence_id: u64,
1035        message: producer::ProducerMessage,
1036    ) -> Message {
1037        let properties = message
1038            .properties
1039            .into_iter()
1040            .map(|(key, value)| proto::KeyValue { key, value })
1041            .collect();
1042
1043        Message {
1044            command: proto::BaseCommand {
1045                r#type: CommandType::Send as i32,
1046                send: Some(proto::CommandSend {
1047                    producer_id,
1048                    sequence_id,
1049                    num_messages: message.num_messages_in_batch,
1050                    ..Default::default()
1051                }),
1052                ..Default::default()
1053            },
1054            payload: Some(Payload {
1055                metadata: proto::MessageMetadata {
1056                    producer_name,
1057                    sequence_id,
1058                    properties,
1059                    publish_time: Utc::now().timestamp_millis() as u64,
1060                    replicated_from: None,
1061                    partition_key: message.partition_key,
1062                    replicate_to: message.replicate_to,
1063                    compression: message.compression,
1064                    uncompressed_size: message.uncompressed_size,
1065                    num_messages_in_batch: message.num_messages_in_batch,
1066                    event_time: message.event_time,
1067                    encryption_keys: message.encryption_keys,
1068                    encryption_algo: message.encryption_algo,
1069                    encryption_param: message.encryption_param,
1070                    schema_version: message.schema_version,
1071                    deliver_at_time: message.deliver_at_time,
1072                    ..Default::default()
1073                },
1074                data: message.payload,
1075            }),
1076        }
1077    }
1078
1079    pub fn lookup_topic(topic: String, authoritative: bool, request_id: u64) -> Message {
1080        Message {
1081            command: proto::BaseCommand {
1082                r#type: CommandType::Lookup as i32,
1083                lookup_topic: Some(proto::CommandLookupTopic {
1084                    topic,
1085                    request_id,
1086                    authoritative: Some(authoritative),
1087                    ..Default::default()
1088                }),
1089                ..Default::default()
1090            },
1091            payload: None,
1092        }
1093    }
1094
1095    pub fn lookup_partitioned_topic(topic: String, request_id: u64) -> Message {
1096        Message {
1097            command: proto::BaseCommand {
1098                r#type: CommandType::PartitionedMetadata as i32,
1099                partition_metadata: Some(proto::CommandPartitionedTopicMetadata {
1100                    topic,
1101                    request_id,
1102                    ..Default::default()
1103                }),
1104                ..Default::default()
1105            },
1106            payload: None,
1107        }
1108    }
1109
1110    pub fn close_producer(producer_id: u64, request_id: u64) -> Message {
1111        Message {
1112            command: proto::BaseCommand {
1113                r#type: CommandType::CloseProducer as i32,
1114                close_producer: Some(proto::CommandCloseProducer {
1115                    producer_id,
1116                    request_id,
1117                }),
1118                ..Default::default()
1119            },
1120            payload: None,
1121        }
1122    }
1123
1124    pub fn subscribe(
1125        topic: String,
1126        subscription: String,
1127        sub_type: SubType,
1128        consumer_id: u64,
1129        request_id: u64,
1130        consumer_name: Option<String>,
1131        options: ConsumerOptions,
1132    ) -> Message {
1133        Message {
1134            command: proto::BaseCommand {
1135                r#type: CommandType::Subscribe as i32,
1136                subscribe: Some(proto::CommandSubscribe {
1137                    topic,
1138                    subscription,
1139                    sub_type: sub_type as i32,
1140                    consumer_id,
1141                    request_id,
1142                    consumer_name,
1143                    priority_level: options.priority_level,
1144                    durable: options.durable,
1145                    metadata: options
1146                        .metadata
1147                        .iter()
1148                        .map(|(k, v)| proto::KeyValue {
1149                            key: k.clone(),
1150                            value: v.clone(),
1151                        })
1152                        .collect(),
1153                    read_compacted: Some(options.read_compacted.unwrap_or(false)),
1154                    initial_position: Some(options.initial_position.into()),
1155                    schema: options.schema,
1156                    start_message_id: options.start_message_id,
1157                    ..Default::default()
1158                }),
1159                ..Default::default()
1160            },
1161            payload: None,
1162        }
1163    }
1164
1165    pub fn flow(consumer_id: u64, message_permits: u32) -> Message {
1166        Message {
1167            command: proto::BaseCommand {
1168                r#type: CommandType::Flow as i32,
1169                flow: Some(proto::CommandFlow {
1170                    consumer_id,
1171                    message_permits,
1172                }),
1173                ..Default::default()
1174            },
1175            payload: None,
1176        }
1177    }
1178
1179    pub fn ack(
1180        consumer_id: u64,
1181        message_id: Vec<proto::MessageIdData>,
1182        cumulative: bool,
1183    ) -> Message {
1184        Message {
1185            command: proto::BaseCommand {
1186                r#type: CommandType::Ack as i32,
1187                ack: Some(proto::CommandAck {
1188                    consumer_id,
1189                    ack_type: if cumulative {
1190                        proto::command_ack::AckType::Cumulative as i32
1191                    } else {
1192                        proto::command_ack::AckType::Individual as i32
1193                    },
1194                    message_id,
1195                    validation_error: None,
1196                    properties: Vec::new(),
1197                    ..Default::default()
1198                }),
1199                ..Default::default()
1200            },
1201            payload: None,
1202        }
1203    }
1204
1205    pub fn redeliver_unacknowleged_messages(
1206        consumer_id: u64,
1207        message_ids: Vec<proto::MessageIdData>,
1208    ) -> Message {
1209        Message {
1210            command: proto::BaseCommand {
1211                r#type: CommandType::RedeliverUnacknowledgedMessages as i32,
1212                redeliver_unacknowledged_messages: Some(
1213                    proto::CommandRedeliverUnacknowledgedMessages {
1214                        consumer_id,
1215                        message_ids,
1216                    },
1217                ),
1218                ..Default::default()
1219            },
1220            payload: None,
1221        }
1222    }
1223
1224    pub fn close_consumer(consumer_id: u64, request_id: u64) -> Message {
1225        Message {
1226            command: proto::BaseCommand {
1227                r#type: CommandType::CloseConsumer as i32,
1228                close_consumer: Some(proto::CommandCloseConsumer {
1229                    consumer_id,
1230                    request_id,
1231                }),
1232                ..Default::default()
1233            },
1234            payload: None,
1235        }
1236    }
1237
1238    pub fn seek(
1239        consumer_id: u64,
1240        request_id: u64,
1241        message_id: Option<MessageIdData>,
1242        message_publish_time: Option<u64>,
1243    ) -> Message {
1244        Message {
1245            command: proto::BaseCommand {
1246                r#type: CommandType::Seek as i32,
1247                seek: Some(proto::CommandSeek {
1248                    consumer_id,
1249                    request_id,
1250                    message_id,
1251                    message_publish_time,
1252                }),
1253                ..Default::default()
1254            },
1255            payload: None,
1256        }
1257    }
1258
1259    pub fn unsubscribe(consumer_id: u64, request_id: u64) -> Message {
1260        Message {
1261            command: proto::BaseCommand {
1262                r#type: CommandType::Unsubscribe as i32,
1263                unsubscribe: Some(proto::CommandUnsubscribe {
1264                    consumer_id,
1265                    request_id,
1266                }),
1267                ..Default::default()
1268            },
1269            payload: None,
1270        }
1271    }
1272}