pulsar/
client.rs

1use std::string::FromUtf8Error;
2use std::sync::Arc;
3
4use futures::channel::{mpsc, oneshot};
5
6use crate::connection::Authentication;
7use crate::connection_manager::{
8    BrokerAddress, ConnectionManager, ConnectionRetryOptions, OperationRetryOptions, TlsOptions,
9};
10use crate::consumer::{ConsumerBuilder, ConsumerOptions, InitialPosition};
11use crate::error::Error;
12use crate::executor::Executor;
13use crate::message::proto::{self, CommandSendReceipt};
14use crate::message::Payload;
15use crate::producer::{self, ProducerBuilder, SendFuture};
16use crate::service_discovery::ServiceDiscovery;
17use futures::StreamExt;
18use futures::lock::Mutex;
19
20/// Helper trait for consumer deserialization
21pub trait DeserializeMessage {
22    /// type produced from the message
23    type Output: Sized;
24    /// deserialize method that will be called by the consumer
25    fn deserialize_message(payload: &Payload) -> Self::Output;
26}
27
28impl DeserializeMessage for Vec<u8> {
29    type Output = Self;
30
31    fn deserialize_message(payload: &Payload) -> Self::Output {
32        payload.data.to_vec()
33    }
34}
35
36impl DeserializeMessage for String {
37    type Output = Result<String, FromUtf8Error>;
38
39    fn deserialize_message(payload: &Payload) -> Self::Output {
40        String::from_utf8(payload.data.to_vec())
41    }
42}
43
44/// Helper trait for message serialization
45pub trait SerializeMessage {
46    /// serialize method that will be called by the producer
47    fn serialize_message(input: Self) -> Result<producer::Message, Error>;
48}
49
50impl SerializeMessage for producer::Message {
51    fn serialize_message(input: Self) -> Result<producer::Message, Error> {
52        Ok(input)
53    }
54}
55
56impl<'a> SerializeMessage for () {
57    fn serialize_message(_input: Self) -> Result<producer::Message, Error> {
58        Ok(producer::Message {
59            ..Default::default()
60        })
61    }
62}
63
64impl<'a> SerializeMessage for &'a [u8] {
65    fn serialize_message(input: Self) -> Result<producer::Message, Error> {
66        Ok(producer::Message {
67            payload: input.to_vec(),
68            ..Default::default()
69        })
70    }
71}
72
73impl SerializeMessage for Vec<u8> {
74    fn serialize_message(input: Self) -> Result<producer::Message, Error> {
75        Ok(producer::Message {
76            payload: input,
77            ..Default::default()
78        })
79    }
80}
81
82impl SerializeMessage for String {
83    fn serialize_message(input: Self) -> Result<producer::Message, Error> {
84        let payload = input.into_bytes();
85        Ok(producer::Message {
86            payload,
87            ..Default::default()
88        })
89    }
90}
91
92impl<'a> SerializeMessage for &String {
93    fn serialize_message(input: Self) -> Result<producer::Message, Error> {
94        let payload = input.as_bytes().to_vec();
95        Ok(producer::Message {
96            payload,
97            ..Default::default()
98        })
99    }
100}
101
102impl<'a> SerializeMessage for &'a str {
103    fn serialize_message(input: Self) -> Result<producer::Message, Error> {
104        let payload = input.as_bytes().to_vec();
105        Ok(producer::Message {
106            payload,
107            ..Default::default()
108        })
109    }
110}
111
112/// Pulsar client
113///
114/// This is the starting point of this API, used to create connections, producers and consumers
115///
116/// While methods are provided to create the client, producers and consumers directly,
117/// the builders should be used for more clarity:
118///
119/// ```rust,no_run
120/// use pulsar::{Pulsar, TokioExecutor};
121///
122/// # async fn run(auth: pulsar::Authentication, retry: pulsar::ConnectionRetryOptions) -> Result<(), pulsar::Error> {
123/// let addr = "pulsar://127.0.0.1:6650";
124/// // you can indicate which executor you use as the return type of client creation
125/// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor)
126///     .with_auth(auth)
127///     .with_connection_retry_options(retry)
128///     .build()
129///     .await?;
130///
131/// let mut producer = pulsar
132///     .producer()
133///     .with_topic("non-persistent://public/default/test")
134///     .with_name("my producer")
135///     .build()
136///     .await?;
137/// # Ok(())
138/// # }
139/// ```
140#[derive(Clone)]
141pub struct Pulsar<Exe: Executor> {
142    pub(crate) manager: Arc<ConnectionManager<Exe>>,
143    service_discovery: Arc<ServiceDiscovery<Exe>>,
144    // this field is an Option to avoid a cyclic dependency between Pulsar
145    // and run_producer: the run_producer loop needs a client to create
146    // a multitopic producer, this producer stores internally a copy
147    // of the Pulsar struct. So even if we drop the main Pulsar instance,
148    // the run_producer loop still lives because it contains a copy of
149    // the sender it waits on.
150    // o,solve this, we create a client without this sender, use it in
151    // run_producer, then fill in the producer field afterwards in the
152    // main Pulsar instance
153    producer: Option<mpsc::UnboundedSender<SendMessage>>,
154    pub(crate) operation_retry_options: OperationRetryOptions,
155    pub(crate) executor: Arc<Exe>,
156}
157
158impl<Exe: Executor> Pulsar<Exe> {
159    /// creates a new client
160    pub(crate) async fn new<S: Into<String>>(
161        url: S,
162        auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
163        connection_retry_parameters: Option<ConnectionRetryOptions>,
164        operation_retry_parameters: Option<OperationRetryOptions>,
165        tls_options: Option<TlsOptions>,
166        executor: Exe,
167    ) -> Result<Self, Error> {
168        let url: String = url.into();
169        let executor = Arc::new(executor);
170        let operation_retry_options = operation_retry_parameters.unwrap_or_default();
171        let manager = ConnectionManager::new(
172            url,
173            auth,
174            connection_retry_parameters,
175            operation_retry_options.clone(),
176            tls_options,
177            executor.clone(),
178        )
179        .await?;
180        let manager = Arc::new(manager);
181
182        // set up a regular connection check
183        let weak_manager = Arc::downgrade(&manager);
184        let mut interval = executor.interval(std::time::Duration::from_secs(60));
185        let res = executor.spawn(Box::pin(async move {
186            while let Some(()) = interval.next().await {
187                if let Some(strong_manager) = weak_manager.upgrade() {
188                    strong_manager.check_connections().await;
189                } else {
190                    // if all the strong references to the manager were dropped,
191                    // we can stop the task
192                    break;
193                }
194            }
195        }));
196        if res.is_err() {
197            error!("the executor could not spawn the check connection task");
198            return Err(crate::error::ConnectionError::Shutdown.into());
199        }
200
201        let service_discovery = Arc::new(ServiceDiscovery::with_manager(manager.clone()));
202        let (producer, producer_rx) = mpsc::unbounded();
203
204        let mut client = Pulsar {
205            manager,
206            service_discovery,
207            producer: None,
208            operation_retry_options,
209            executor,
210        };
211
212        let _ = client
213            .executor
214            .spawn(Box::pin(run_producer(client.clone(), producer_rx)));
215        client.producer = Some(producer);
216        Ok(client)
217    }
218
219    /// creates a new client builder
220    ///
221    /// ```rust,no_run
222    /// use pulsar::{Pulsar, TokioExecutor};
223    ///
224    /// # async fn run() -> Result<(), pulsar::Error> {
225    /// let addr = "pulsar://127.0.0.1:6650";
226    /// // you can indicate which executor you use as the return type of client creation
227    /// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor)
228    ///     .build()
229    ///     .await?;
230    /// # Ok(())
231    /// # }
232    /// ```
233    pub fn builder<S: Into<String>>(url: S, executor: Exe) -> PulsarBuilder<Exe> {
234        PulsarBuilder {
235            url: url.into(),
236            auth_provider: None,
237            connection_retry_options: None,
238            operation_retry_options: None,
239            tls_options: None,
240            executor,
241        }
242    }
243
244    /// creates a consumer builder
245    ///
246    /// ```rust,no_run
247    /// use pulsar::{SubType, Consumer};
248    ///
249    /// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
250    /// # type TestData = String;
251    /// let mut consumer: Consumer<TestData, _> = pulsar
252    ///     .consumer()
253    ///     .with_topic("non-persistent://public/default/test")
254    ///     .with_consumer_name("test_consumer")
255    ///     .with_subscription_type(SubType::Exclusive)
256    ///     .with_subscription("test_subscription")
257    ///     .build()
258    ///     .await?;
259    /// # Ok(())
260    /// # }
261    /// ```
262    pub fn consumer(&self) -> ConsumerBuilder<Exe> {
263        ConsumerBuilder::new(self)
264    }
265
266    /// creates a producer builder
267    ///
268    /// ```rust,no_run
269    /// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
270    /// let mut producer = pulsar
271    ///     .producer()
272    ///     .with_topic("non-persistent://public/default/test")
273    ///     .with_name("my producer")
274    ///     .build()
275    ///     .await?;
276    /// # Ok(())
277    /// # }
278    /// ```
279    pub fn producer(&self) -> ProducerBuilder<Exe> {
280        ProducerBuilder::new(self)
281    }
282
283    /// creates a reader builder
284    /// ```rust, no_run
285    /// use pulsar::reader::Reader;
286    ///
287    /// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
288    /// # type TestData = String;
289    /// let mut reader: Reader<TestData, _> = pulsar
290    ///     .reader()
291    ///     .with_topic("non-persistent://public/default/test")
292    ///     .with_consumer_name("my_reader")
293    ///     .into_reader()
294    ///     .await?;
295    /// # Ok(())
296    /// # }
297    /// ```
298    pub fn reader(&self) -> ConsumerBuilder<Exe> {
299        // this makes it exactly the same like the consumer() method though
300        ConsumerBuilder::new(self).with_options(
301            ConsumerOptions::default()
302                .durable(false)
303                .with_initial_position(InitialPosition::Latest),
304        )
305    }
306
307    /// gets the address of a broker handling the topic
308    ///
309    /// ```rust,no_run
310    /// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
311    /// let broker_address = pulsar.lookup_topic("persistent://public/default/test").await?;
312    /// # Ok(())
313    /// # }
314    /// ```
315    pub async fn lookup_topic<S: Into<String>>(&self, topic: S) -> Result<BrokerAddress, Error> {
316        self.service_discovery
317            .lookup_topic(topic)
318            .await
319            .map_err(|e| e.into())
320    }
321
322    /// gets the number of partitions for a partitioned topic
323    ///
324    /// ```rust,no_run
325    /// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
326    /// let nb = pulsar.lookup_partitioned_topic_number("persistent://public/default/test").await?;
327    /// # Ok(())
328    /// # }
329    /// ```
330    pub async fn lookup_partitioned_topic_number<S: Into<String>>(
331        &self,
332        topic: S,
333    ) -> Result<u32, Error> {
334        self.service_discovery
335            .lookup_partitioned_topic_number(topic)
336            .await
337            .map_err(|e| e.into())
338    }
339
340    /// gets the address of brokers handling the topic's partitions. If the topic is not
341    /// a partitioned topic, result will be a single element containing the topic and address
342    /// of the non-partitioned topic provided.
343    ///
344    /// ```rust,no_run
345    /// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
346    /// let broker_addresses = pulsar.lookup_partitioned_topic("persistent://public/default/test").await?;
347    /// # Ok(())
348    /// # }
349    /// ```
350    pub async fn lookup_partitioned_topic<S: Into<String>>(
351        &self,
352        topic: S,
353    ) -> Result<Vec<(String, BrokerAddress)>, Error> {
354        self.service_discovery
355            .lookup_partitioned_topic(topic)
356            .await
357            .map_err(|e| e.into())
358    }
359
360    /// gets the list of topics from a namespace
361    ///
362    /// ```rust,no_run
363    /// use pulsar::message::proto::command_get_topics_of_namespace::Mode;
364    ///
365    /// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
366    /// let topics = pulsar.get_topics_of_namespace("public/default".to_string(), Mode::Persistent).await?;
367    /// # Ok(())
368    /// # }
369    /// ```
370    pub async fn get_topics_of_namespace(
371        &self,
372        namespace: String,
373        mode: proto::command_get_topics_of_namespace::Mode,
374    ) -> Result<Vec<String>, Error> {
375        let conn = self.manager.get_base_connection().await?;
376        let topics = conn
377            .sender()
378            .get_topics_of_namespace(namespace, mode)
379            .await?;
380        Ok(topics.topics)
381    }
382
383    /// Sends a message on a topic.
384    ///
385    /// This function will lazily initialize and re-use producers as needed. For better
386    /// control over producers, creating and using a `Producer` is recommended.
387    ///
388    /// ```rust,no_run
389    /// use pulsar::message::proto::command_get_topics_of_namespace::Mode;
390    ///
391    /// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
392    /// let topics = pulsar.send("persistent://public/default/test", "hello world!").await?;
393    /// # Ok(())
394    /// # }
395    /// ```
396    pub async fn send<S: Into<String>, M: SerializeMessage + Sized>(
397        &self,
398        topic: S,
399        message: M,
400    ) -> Result<SendFuture, Error> {
401        let message = M::serialize_message(message)?;
402        self.send_raw(message, topic).await
403    }
404
405    async fn send_raw<S: Into<String>>(
406        &self,
407        message: producer::Message,
408        topic: S,
409    ) -> Result<SendFuture, Error> {
410        let (resolver, future) = oneshot::channel();
411        self.producer
412            .as_ref()
413            .expect("a client without the producer channel should only be used internally")
414            .unbounded_send(SendMessage {
415                topic: topic.into(),
416                message,
417                resolver,
418            })
419            .map_err(|_| Error::Custom("producer unexpectedly disconnected".into()))?;
420        Ok(SendFuture(future))
421    }
422}
423
424/// Helper structure to generate a [Pulsar] client
425pub struct PulsarBuilder<Exe: Executor> {
426    url: String,
427    auth_provider: Option<Box<dyn crate::authentication::Authentication>>,
428    connection_retry_options: Option<ConnectionRetryOptions>,
429    operation_retry_options: Option<OperationRetryOptions>,
430    tls_options: Option<TlsOptions>,
431    executor: Exe,
432}
433
434impl<Exe: Executor> PulsarBuilder<Exe> {
435    /// Authentication parameters (JWT, Biscuit, etc)
436    pub fn with_auth(self, auth: Authentication) -> Self {
437        self.with_auth_provider(Box::new(auth))
438    }
439
440    pub fn with_auth_provider(mut self, auth: Box<dyn crate::authentication::Authentication>) -> Self {
441        self.auth_provider = Some(auth);
442        self
443    }
444
445    /// Exponential back off parameters for automatic reconnection
446    pub fn with_connection_retry_options(
447        mut self,
448        connection_retry_options: ConnectionRetryOptions,
449    ) -> Self {
450        self.connection_retry_options = Some(connection_retry_options);
451        self
452    }
453
454    /// Retry parameters for Pulsar operations
455    pub fn with_operation_retry_options(
456        mut self,
457        operation_retry_options: OperationRetryOptions,
458    ) -> Self {
459        self.operation_retry_options = Some(operation_retry_options);
460        self
461    }
462
463    /// add a custom certificate chain to authenticate the server in TLS connections
464    pub fn with_certificate_chain(mut self, certificate_chain: Vec<u8>) -> Self {
465        match &mut self.tls_options {
466            Some(tls) => tls.certificate_chain = Some(certificate_chain),
467            None => {
468                self.tls_options = Some(TlsOptions {
469                    certificate_chain: Some(certificate_chain),
470                    ..Default::default()
471                })
472            }
473        }
474        self
475    }
476
477    pub fn with_allow_insecure_connection(mut self, allow: bool) -> Self {
478        match &mut self.tls_options {
479            Some(tls) => tls.allow_insecure_connection = allow,
480            None => {
481                self.tls_options = Some(TlsOptions {
482                    allow_insecure_connection: allow,
483                    ..Default::default()
484                })
485            }
486        }
487        self
488    }
489
490    pub fn with_tls_hostname_verification_enabled(mut self, enabled: bool) -> Self {
491        match &mut self.tls_options {
492            Some(tls) => tls.tls_hostname_verification_enabled = enabled,
493            None => {
494                self.tls_options = Some(TlsOptions {
495                    tls_hostname_verification_enabled: enabled,
496                    ..Default::default()
497                })
498            }
499        }
500        self
501    }
502
503    /// add a custom certificate chain from a file to authenticate the server in TLS connections
504    pub fn with_certificate_chain_file<P: AsRef<std::path::Path>>(
505        self,
506        path: P,
507    ) -> Result<Self, std::io::Error> {
508        use std::io::Read;
509
510        let mut file = std::fs::File::open(path)?;
511        let mut v = vec![];
512        file.read_to_end(&mut v)?;
513
514        Ok(self.with_certificate_chain(v))
515    }
516
517    /// creates the Pulsar client and connects it
518    pub async fn build(self) -> Result<Pulsar<Exe>, Error> {
519        let PulsarBuilder {
520            url,
521            auth_provider,
522            connection_retry_options,
523            operation_retry_options,
524            tls_options,
525            executor,
526        } = self;
527
528        Pulsar::new(
529            url,
530            auth_provider.map(|p| Arc::new(Mutex::new(p))),
531            connection_retry_options,
532            operation_retry_options,
533            tls_options,
534            executor,
535        )
536        .await
537    }
538}
539
540struct SendMessage {
541    topic: String,
542    message: producer::Message,
543    resolver: oneshot::Sender<Result<CommandSendReceipt, Error>>,
544}
545
546async fn run_producer<Exe: Executor>(
547    client: Pulsar<Exe>,
548    mut messages: mpsc::UnboundedReceiver<SendMessage>,
549) {
550    let mut producer = client.producer().build_multi_topic();
551    while let Some(SendMessage {
552        topic,
553        message: payload,
554        resolver,
555    }) = messages.next().await
556    {
557        match producer.send(topic, payload).await {
558            Ok(future) => {
559                let _ = client.executor.spawn(Box::pin(async move {
560                    let _ = resolver.send(future.await);
561                }));
562            }
563            Err(e) => {
564                let _ = resolver.send(Err(e));
565            }
566        }
567    }
568}