1use std::string::FromUtf8Error;
2use std::sync::Arc;
3
4use futures::channel::{mpsc, oneshot};
5
6use crate::connection::Authentication;
7use crate::connection_manager::{
8 BrokerAddress, ConnectionManager, ConnectionRetryOptions, OperationRetryOptions, TlsOptions,
9};
10use crate::consumer::{ConsumerBuilder, ConsumerOptions, InitialPosition};
11use crate::error::Error;
12use crate::executor::Executor;
13use crate::message::proto::{self, CommandSendReceipt};
14use crate::message::Payload;
15use crate::producer::{self, ProducerBuilder, SendFuture};
16use crate::service_discovery::ServiceDiscovery;
17use futures::StreamExt;
18use futures::lock::Mutex;
19
20pub trait DeserializeMessage {
22 type Output: Sized;
24 fn deserialize_message(payload: &Payload) -> Self::Output;
26}
27
28impl DeserializeMessage for Vec<u8> {
29 type Output = Self;
30
31 fn deserialize_message(payload: &Payload) -> Self::Output {
32 payload.data.to_vec()
33 }
34}
35
36impl DeserializeMessage for String {
37 type Output = Result<String, FromUtf8Error>;
38
39 fn deserialize_message(payload: &Payload) -> Self::Output {
40 String::from_utf8(payload.data.to_vec())
41 }
42}
43
44pub trait SerializeMessage {
46 fn serialize_message(input: Self) -> Result<producer::Message, Error>;
48}
49
50impl SerializeMessage for producer::Message {
51 fn serialize_message(input: Self) -> Result<producer::Message, Error> {
52 Ok(input)
53 }
54}
55
56impl<'a> SerializeMessage for () {
57 fn serialize_message(_input: Self) -> Result<producer::Message, Error> {
58 Ok(producer::Message {
59 ..Default::default()
60 })
61 }
62}
63
64impl<'a> SerializeMessage for &'a [u8] {
65 fn serialize_message(input: Self) -> Result<producer::Message, Error> {
66 Ok(producer::Message {
67 payload: input.to_vec(),
68 ..Default::default()
69 })
70 }
71}
72
73impl SerializeMessage for Vec<u8> {
74 fn serialize_message(input: Self) -> Result<producer::Message, Error> {
75 Ok(producer::Message {
76 payload: input,
77 ..Default::default()
78 })
79 }
80}
81
82impl SerializeMessage for String {
83 fn serialize_message(input: Self) -> Result<producer::Message, Error> {
84 let payload = input.into_bytes();
85 Ok(producer::Message {
86 payload,
87 ..Default::default()
88 })
89 }
90}
91
92impl<'a> SerializeMessage for &String {
93 fn serialize_message(input: Self) -> Result<producer::Message, Error> {
94 let payload = input.as_bytes().to_vec();
95 Ok(producer::Message {
96 payload,
97 ..Default::default()
98 })
99 }
100}
101
102impl<'a> SerializeMessage for &'a str {
103 fn serialize_message(input: Self) -> Result<producer::Message, Error> {
104 let payload = input.as_bytes().to_vec();
105 Ok(producer::Message {
106 payload,
107 ..Default::default()
108 })
109 }
110}
111
112#[derive(Clone)]
141pub struct Pulsar<Exe: Executor> {
142 pub(crate) manager: Arc<ConnectionManager<Exe>>,
143 service_discovery: Arc<ServiceDiscovery<Exe>>,
144 producer: Option<mpsc::UnboundedSender<SendMessage>>,
154 pub(crate) operation_retry_options: OperationRetryOptions,
155 pub(crate) executor: Arc<Exe>,
156}
157
158impl<Exe: Executor> Pulsar<Exe> {
159 pub(crate) async fn new<S: Into<String>>(
161 url: S,
162 auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
163 connection_retry_parameters: Option<ConnectionRetryOptions>,
164 operation_retry_parameters: Option<OperationRetryOptions>,
165 tls_options: Option<TlsOptions>,
166 executor: Exe,
167 ) -> Result<Self, Error> {
168 let url: String = url.into();
169 let executor = Arc::new(executor);
170 let operation_retry_options = operation_retry_parameters.unwrap_or_default();
171 let manager = ConnectionManager::new(
172 url,
173 auth,
174 connection_retry_parameters,
175 operation_retry_options.clone(),
176 tls_options,
177 executor.clone(),
178 )
179 .await?;
180 let manager = Arc::new(manager);
181
182 let weak_manager = Arc::downgrade(&manager);
184 let mut interval = executor.interval(std::time::Duration::from_secs(60));
185 let res = executor.spawn(Box::pin(async move {
186 while let Some(()) = interval.next().await {
187 if let Some(strong_manager) = weak_manager.upgrade() {
188 strong_manager.check_connections().await;
189 } else {
190 break;
193 }
194 }
195 }));
196 if res.is_err() {
197 error!("the executor could not spawn the check connection task");
198 return Err(crate::error::ConnectionError::Shutdown.into());
199 }
200
201 let service_discovery = Arc::new(ServiceDiscovery::with_manager(manager.clone()));
202 let (producer, producer_rx) = mpsc::unbounded();
203
204 let mut client = Pulsar {
205 manager,
206 service_discovery,
207 producer: None,
208 operation_retry_options,
209 executor,
210 };
211
212 let _ = client
213 .executor
214 .spawn(Box::pin(run_producer(client.clone(), producer_rx)));
215 client.producer = Some(producer);
216 Ok(client)
217 }
218
219 pub fn builder<S: Into<String>>(url: S, executor: Exe) -> PulsarBuilder<Exe> {
234 PulsarBuilder {
235 url: url.into(),
236 auth_provider: None,
237 connection_retry_options: None,
238 operation_retry_options: None,
239 tls_options: None,
240 executor,
241 }
242 }
243
244 pub fn consumer(&self) -> ConsumerBuilder<Exe> {
263 ConsumerBuilder::new(self)
264 }
265
266 pub fn producer(&self) -> ProducerBuilder<Exe> {
280 ProducerBuilder::new(self)
281 }
282
283 pub fn reader(&self) -> ConsumerBuilder<Exe> {
299 ConsumerBuilder::new(self).with_options(
301 ConsumerOptions::default()
302 .durable(false)
303 .with_initial_position(InitialPosition::Latest),
304 )
305 }
306
307 pub async fn lookup_topic<S: Into<String>>(&self, topic: S) -> Result<BrokerAddress, Error> {
316 self.service_discovery
317 .lookup_topic(topic)
318 .await
319 .map_err(|e| e.into())
320 }
321
322 pub async fn lookup_partitioned_topic_number<S: Into<String>>(
331 &self,
332 topic: S,
333 ) -> Result<u32, Error> {
334 self.service_discovery
335 .lookup_partitioned_topic_number(topic)
336 .await
337 .map_err(|e| e.into())
338 }
339
340 pub async fn lookup_partitioned_topic<S: Into<String>>(
351 &self,
352 topic: S,
353 ) -> Result<Vec<(String, BrokerAddress)>, Error> {
354 self.service_discovery
355 .lookup_partitioned_topic(topic)
356 .await
357 .map_err(|e| e.into())
358 }
359
360 pub async fn get_topics_of_namespace(
371 &self,
372 namespace: String,
373 mode: proto::command_get_topics_of_namespace::Mode,
374 ) -> Result<Vec<String>, Error> {
375 let conn = self.manager.get_base_connection().await?;
376 let topics = conn
377 .sender()
378 .get_topics_of_namespace(namespace, mode)
379 .await?;
380 Ok(topics.topics)
381 }
382
383 pub async fn send<S: Into<String>, M: SerializeMessage + Sized>(
397 &self,
398 topic: S,
399 message: M,
400 ) -> Result<SendFuture, Error> {
401 let message = M::serialize_message(message)?;
402 self.send_raw(message, topic).await
403 }
404
405 async fn send_raw<S: Into<String>>(
406 &self,
407 message: producer::Message,
408 topic: S,
409 ) -> Result<SendFuture, Error> {
410 let (resolver, future) = oneshot::channel();
411 self.producer
412 .as_ref()
413 .expect("a client without the producer channel should only be used internally")
414 .unbounded_send(SendMessage {
415 topic: topic.into(),
416 message,
417 resolver,
418 })
419 .map_err(|_| Error::Custom("producer unexpectedly disconnected".into()))?;
420 Ok(SendFuture(future))
421 }
422}
423
424pub struct PulsarBuilder<Exe: Executor> {
426 url: String,
427 auth_provider: Option<Box<dyn crate::authentication::Authentication>>,
428 connection_retry_options: Option<ConnectionRetryOptions>,
429 operation_retry_options: Option<OperationRetryOptions>,
430 tls_options: Option<TlsOptions>,
431 executor: Exe,
432}
433
434impl<Exe: Executor> PulsarBuilder<Exe> {
435 pub fn with_auth(self, auth: Authentication) -> Self {
437 self.with_auth_provider(Box::new(auth))
438 }
439
440 pub fn with_auth_provider(mut self, auth: Box<dyn crate::authentication::Authentication>) -> Self {
441 self.auth_provider = Some(auth);
442 self
443 }
444
445 pub fn with_connection_retry_options(
447 mut self,
448 connection_retry_options: ConnectionRetryOptions,
449 ) -> Self {
450 self.connection_retry_options = Some(connection_retry_options);
451 self
452 }
453
454 pub fn with_operation_retry_options(
456 mut self,
457 operation_retry_options: OperationRetryOptions,
458 ) -> Self {
459 self.operation_retry_options = Some(operation_retry_options);
460 self
461 }
462
463 pub fn with_certificate_chain(mut self, certificate_chain: Vec<u8>) -> Self {
465 match &mut self.tls_options {
466 Some(tls) => tls.certificate_chain = Some(certificate_chain),
467 None => {
468 self.tls_options = Some(TlsOptions {
469 certificate_chain: Some(certificate_chain),
470 ..Default::default()
471 })
472 }
473 }
474 self
475 }
476
477 pub fn with_allow_insecure_connection(mut self, allow: bool) -> Self {
478 match &mut self.tls_options {
479 Some(tls) => tls.allow_insecure_connection = allow,
480 None => {
481 self.tls_options = Some(TlsOptions {
482 allow_insecure_connection: allow,
483 ..Default::default()
484 })
485 }
486 }
487 self
488 }
489
490 pub fn with_tls_hostname_verification_enabled(mut self, enabled: bool) -> Self {
491 match &mut self.tls_options {
492 Some(tls) => tls.tls_hostname_verification_enabled = enabled,
493 None => {
494 self.tls_options = Some(TlsOptions {
495 tls_hostname_verification_enabled: enabled,
496 ..Default::default()
497 })
498 }
499 }
500 self
501 }
502
503 pub fn with_certificate_chain_file<P: AsRef<std::path::Path>>(
505 self,
506 path: P,
507 ) -> Result<Self, std::io::Error> {
508 use std::io::Read;
509
510 let mut file = std::fs::File::open(path)?;
511 let mut v = vec![];
512 file.read_to_end(&mut v)?;
513
514 Ok(self.with_certificate_chain(v))
515 }
516
517 pub async fn build(self) -> Result<Pulsar<Exe>, Error> {
519 let PulsarBuilder {
520 url,
521 auth_provider,
522 connection_retry_options,
523 operation_retry_options,
524 tls_options,
525 executor,
526 } = self;
527
528 Pulsar::new(
529 url,
530 auth_provider.map(|p| Arc::new(Mutex::new(p))),
531 connection_retry_options,
532 operation_retry_options,
533 tls_options,
534 executor,
535 )
536 .await
537 }
538}
539
540struct SendMessage {
541 topic: String,
542 message: producer::Message,
543 resolver: oneshot::Sender<Result<CommandSendReceipt, Error>>,
544}
545
546async fn run_producer<Exe: Executor>(
547 client: Pulsar<Exe>,
548 mut messages: mpsc::UnboundedReceiver<SendMessage>,
549) {
550 let mut producer = client.producer().build_multi_topic();
551 while let Some(SendMessage {
552 topic,
553 message: payload,
554 resolver,
555 }) = messages.next().await
556 {
557 match producer.send(topic, payload).await {
558 Ok(future) => {
559 let _ = client.executor.spawn(Box::pin(async move {
560 let _ = resolver.send(future.await);
561 }));
562 }
563 Err(e) => {
564 let _ = resolver.send(Err(e));
565 }
566 }
567 }
568}