pulsar/
lib.rs

1//! # Pure Rust async await client for Apache Pulsar
2//!
3//! This is a pure Rust client for Apache Pulsar that does not depend on the
4//! C++ Pulsar library. It provides an async/await based API, compatible with
5//! [Tokio](https://tokio.rs/) and [async-std](https://async.rs/).
6//!
7//! Features:
8//! - URL based (`pulsar://` and `pulsar+ssl://`) connections with DNS lookup
9//! - multi topic consumers (based on a regex)
10//! - TLS connection
11//! - configurable executor (Tokio or async-std)
12//! - automatic reconnection with exponential back off
13//! - message batching
14//! - compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features)
15//!
16//! ## Examples
17//!
18//! Copy this into your project's Cargo.toml:
19//!
20//! ```toml
21//! [dependencies]
22//! env_logger = "0.9"
23//! pulsar = "4.1.1"
24//! serde = { version = "1.0", features = ["derive"] }
25//! serde_json = "1.0"
26//! tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
27//! log = "0.4.6"
28//! futures = "0.3"
29//! ```
30//!
31//! ### Producing
32//! ```rust,no_run
33//! use pulsar::{
34//!     message::proto, producer, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor,
35//! };
36//! use serde::{Deserialize, Serialize};
37//!
38//! #[derive(Serialize, Deserialize)]
39//! struct TestData {
40//!     data: String,
41//! }
42//!
43//! impl SerializeMessage for TestData {
44//!     fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
45//!         let payload = serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
46//!         Ok(producer::Message {
47//!             payload,
48//!             ..Default::default()
49//!         })
50//!     }
51//! }
52//!
53//! #[tokio::main]
54//! async fn main() -> Result<(), pulsar::Error> {
55//!     env_logger::init();
56//!
57//!     let addr = "pulsar://127.0.0.1:6650";
58//!     let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
59//!     let mut producer = pulsar
60//!         .producer()
61//!         .with_topic("non-persistent://public/default/test")
62//!         .with_name("my producer")
63//!         .with_options(producer::ProducerOptions {
64//!             schema: Some(proto::Schema {
65//!                 r#type: proto::schema::Type::String as i32,
66//!                 ..Default::default()
67//!             }),
68//!             ..Default::default()
69//!         })
70//!         .build()
71//!         .await?;
72//!
73//!     let mut counter = 0usize;
74//!     loop {
75//!         producer
76//!             .send(TestData {
77//!                 data: "data".to_string(),
78//!             })
79//!             .await?;
80//!
81//!         counter += 1;
82//!         println!("{} messages", counter);
83//!         tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
84//!     }
85//! }
86//! ```
87//!
88//! ### Consuming
89//! ```rust,no_run
90//! use futures::TryStreamExt;
91//! use pulsar::{
92//!     message::proto::command_subscribe::SubType, message::Payload, Consumer, DeserializeMessage,
93//!     Pulsar, TokioExecutor,
94//! };
95//! use serde::{Deserialize, Serialize};
96//!
97//! #[derive(Serialize, Deserialize)]
98//! struct TestData {
99//!     data: String,
100//! }
101//!
102//! impl DeserializeMessage for TestData {
103//!     type Output = Result<TestData, serde_json::Error>;
104//!
105//!     fn deserialize_message(payload: &Payload) -> Self::Output {
106//!         serde_json::from_slice(&payload.data)
107//!     }
108//! }
109//!
110//! #[tokio::main]
111//! async fn main() -> Result<(), pulsar::Error> {
112//!     env_logger::init();
113//!
114//!     let addr = "pulsar://127.0.0.1:6650";
115//!     let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
116//!
117//!     let mut consumer: Consumer<TestData, _> = pulsar
118//!         .consumer()
119//!         .with_topic("test")
120//!         .with_consumer_name("test_consumer")
121//!         .with_subscription_type(SubType::Exclusive)
122//!         .with_subscription("test_subscription")
123//!         .build()
124//!         .await?;
125//!
126//!     let mut counter = 0usize;
127//!     while let Some(msg) = consumer.try_next().await? {
128//!         consumer.ack(&msg).await?;
129//!         let data = match msg.deserialize() {
130//!             Ok(data) => data,
131//!             Err(e) => {
132//!                 log::error!("could not deserialize message: {:?}", e);
133//!                 break;
134//!             }
135//!         };
136//!
137//!         if data.data.as_str() != "data" {
138//!             log::error!("Unexpected payload: {}", &data.data);
139//!             break;
140//!         }
141//!         counter += 1;
142//!         log::info!("got {} messages", counter);
143//!     }
144//!
145//!     Ok(())
146//! }
147//! ```
148#![allow(clippy::too_many_arguments)]
149#![allow(clippy::large_enum_variant)]
150extern crate futures;
151#[macro_use]
152extern crate log;
153extern crate nom;
154extern crate prost_derive;
155
156#[cfg(test)]
157#[macro_use]
158extern crate serde;
159
160pub use client::{DeserializeMessage, Pulsar, PulsarBuilder, SerializeMessage};
161pub use connection::Authentication;
162pub use connection_manager::{
163    BrokerAddress, ConnectionRetryOptions, OperationRetryOptions, TlsOptions,
164};
165pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions};
166pub use error::Error;
167#[cfg(feature = "async-std-runtime")]
168pub use executor::AsyncStdExecutor;
169pub use executor::Executor;
170#[cfg(feature = "tokio-runtime")]
171pub use executor::TokioExecutor;
172pub use message::proto::command_subscribe::SubType;
173pub use message::{
174    proto::{self, CommandSendReceipt},
175    Payload,
176};
177pub use producer::{MultiTopicProducer, Producer, ProducerOptions};
178
179mod client;
180mod connection;
181mod connection_manager;
182pub mod consumer;
183pub mod error;
184pub mod executor;
185pub mod message;
186pub mod producer;
187pub mod reader;
188pub mod authentication;
189mod service_discovery;
190
191#[cfg(test)]
192mod tests {
193    use futures::{future::try_join_all, StreamExt};
194    use log::{LevelFilter, Metadata, Record};
195    use std::collections::BTreeSet;
196    use std::time::{Duration, Instant};
197
198    #[cfg(feature = "tokio-runtime")]
199    use tokio::time::timeout;
200
201    #[cfg(feature = "tokio-runtime")]
202    use crate::executor::TokioExecutor;
203
204    use crate::client::SerializeMessage;
205    use crate::consumer::{InitialPosition, Message};
206    use crate::message::proto::command_subscribe::SubType;
207    use crate::message::Payload;
208    use crate::Error as PulsarError;
209
210    use super::*;
211
212    #[derive(Debug, Serialize, Deserialize)]
213    struct TestData {
214        pub id: u64,
215        pub data: String,
216    }
217
218    impl<'a> SerializeMessage for &'a TestData {
219        fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
220            let payload =
221                serde_json::to_vec(input).map_err(|e| PulsarError::Custom(e.to_string()))?;
222            Ok(producer::Message {
223                payload,
224                ..Default::default()
225            })
226        }
227    }
228
229    impl DeserializeMessage for TestData {
230        type Output = Result<TestData, serde_json::Error>;
231
232        fn deserialize_message(payload: &Payload) -> Self::Output {
233            serde_json::from_slice(&payload.data)
234        }
235    }
236
237    #[derive(Debug)]
238    enum Error {
239        Pulsar(PulsarError),
240        Timeout(std::io::Error),
241        Serde(serde_json::Error),
242        Utf8(std::string::FromUtf8Error),
243    }
244
245    impl From<std::io::Error> for Error {
246        fn from(e: std::io::Error) -> Self {
247            Error::Timeout(e)
248        }
249    }
250
251    impl From<PulsarError> for Error {
252        fn from(e: PulsarError) -> Self {
253            Error::Pulsar(e)
254        }
255    }
256
257    impl From<serde_json::Error> for Error {
258        fn from(e: serde_json::Error) -> Self {
259            Error::Serde(e)
260        }
261    }
262
263    impl From<std::string::FromUtf8Error> for Error {
264        fn from(err: std::string::FromUtf8Error) -> Self {
265            Error::Utf8(err)
266        }
267    }
268
269    impl std::fmt::Display for Error {
270        fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
271            match self {
272                Error::Pulsar(e) => write!(f, "{}", e),
273                Error::Timeout(e) => write!(f, "{}", e),
274                Error::Serde(e) => write!(f, "{}", e),
275                Error::Utf8(e) => write!(f, "{}", e),
276            }
277        }
278    }
279
280    pub struct SimpleLogger {
281        pub tag: &'static str,
282    }
283    impl log::Log for SimpleLogger {
284        fn enabled(&self, _metadata: &Metadata) -> bool {
285            //metadata.level() <= Level::Info
286            true
287        }
288
289        fn log(&self, record: &Record) {
290            if self.enabled(record.metadata()) {
291                println!(
292                    "{} {} {}\t{}\t{}",
293                    chrono::Utc::now(),
294                    self.tag,
295                    record.level(),
296                    record.module_path().unwrap(),
297                    record.args()
298                );
299            }
300        }
301        fn flush(&self) {}
302    }
303
304    pub static TEST_LOGGER: SimpleLogger = SimpleLogger { tag: "" };
305
306    #[tokio::test]
307    #[cfg(feature = "tokio-runtime")]
308    async fn round_trip() {
309        let _ = log::set_logger(&TEST_LOGGER);
310        let _ = log::set_max_level(LevelFilter::Debug);
311
312        let addr = "pulsar://127.0.0.1:6650";
313        let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
314
315        // random topic to better allow multiple test runs while debugging
316        let topic = format!("test_{}", rand::random::<u16>());
317
318        let mut producer = pulsar.producer().with_topic(&topic).build().await.unwrap();
319        info!("producer created");
320
321        let message_ids: BTreeSet<u64> = (0..100).collect();
322
323        info!("will send message");
324        let mut sends = Vec::new();
325        for &id in &message_ids {
326            let message = TestData {
327                data: "data".to_string(),
328                id,
329            };
330            sends.push(producer.send(&message).await.unwrap());
331        }
332        try_join_all(sends).await.unwrap();
333
334        info!("sent");
335
336        let mut consumer: Consumer<TestData, _> = pulsar
337            .consumer()
338            .with_topic(&topic)
339            .with_consumer_name("test_consumer")
340            .with_subscription_type(SubType::Exclusive)
341            .with_subscription("test_subscription")
342            .with_options(ConsumerOptions {
343                initial_position: InitialPosition::Earliest,
344                ..Default::default()
345            })
346            .build()
347            .await
348            .unwrap();
349
350        info!("consumer created");
351
352        let topics = consumer.topics();
353        debug!("consumer connected to {:?}", topics);
354        assert_eq!(topics.len(), 1);
355        assert!(topics[0].ends_with(&topic));
356
357        let mut received = BTreeSet::new();
358        while let Ok(Some(msg)) = timeout(Duration::from_secs(10), consumer.next()).await {
359            let msg: Message<TestData> = msg.unwrap();
360            info!("id: {:?}", msg.message_id());
361            received.insert(msg.deserialize().unwrap().id);
362            consumer.ack(&msg).await.unwrap();
363            if received.len() == message_ids.len() {
364                break;
365            }
366        }
367        assert_eq!(received.len(), message_ids.len());
368        assert_eq!(received, message_ids);
369    }
370
371    #[tokio::test]
372    #[cfg(feature = "tokio-runtime")]
373    async fn unsized_data() {
374        let _ = log::set_logger(&TEST_LOGGER);
375        let _ = log::set_max_level(LevelFilter::Debug);
376
377        let addr = "pulsar://127.0.0.1:6650";
378        let test_id: u16 = rand::random();
379        let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
380
381        // test &str
382        {
383            let topic = format!("test_unsized_data_str_{}", test_id);
384            let send_data = "some unsized data";
385
386            pulsar
387                .send(&topic, send_data.to_string())
388                .await
389                .unwrap()
390                .await
391                .unwrap();
392
393            let mut consumer = pulsar
394                .consumer()
395                .with_topic(&topic)
396                .with_subscription_type(SubType::Exclusive)
397                .with_subscription("test_subscription")
398                .with_options(ConsumerOptions {
399                    initial_position: InitialPosition::Earliest,
400                    ..Default::default()
401                })
402                .build::<String>()
403                .await
404                .unwrap();
405
406            let msg = timeout(Duration::from_secs(1), consumer.next())
407                .await
408                .unwrap()
409                .unwrap()
410                .unwrap();
411            consumer.ack(&msg).await.unwrap();
412
413            let data = msg.deserialize().unwrap();
414            if data.as_str() != send_data {
415                panic!("Unexpected payload in &str test: {}", &data);
416            }
417        }
418
419        // test &[u8]
420        {
421            let topic = format!("test_unsized_data_bytes_{}", test_id);
422            let send_data: &[u8] = &[0, 1, 2, 3];
423
424            pulsar
425                .send(&topic, send_data.to_vec())
426                .await
427                .unwrap()
428                .await
429                .unwrap();
430
431            let mut consumer = pulsar
432                .consumer()
433                .with_topic(&topic)
434                .with_subscription_type(SubType::Exclusive)
435                .with_subscription("test_subscription")
436                .with_options(ConsumerOptions {
437                    initial_position: InitialPosition::Earliest,
438                    ..Default::default()
439                })
440                .build::<Vec<u8>>()
441                .await
442                .unwrap();
443
444            let msg: Message<Vec<u8>> = timeout(Duration::from_secs(1), consumer.next())
445                .await
446                .unwrap()
447                .unwrap()
448                .unwrap();
449            consumer.ack(&msg).await.unwrap();
450            let data = msg.deserialize();
451            if data.as_slice() != send_data {
452                panic!("Unexpected payload in &[u8] test: {:?}", &data);
453            }
454        }
455    }
456
457    #[tokio::test]
458    #[cfg(feature = "tokio-runtime")]
459    async fn redelivery() {
460        let _ = log::set_logger(&TEST_LOGGER);
461        let _ = log::set_max_level(LevelFilter::Debug);
462
463        let addr = "pulsar://127.0.0.1:6650";
464        let topic = format!("test_redelivery_{}", rand::random::<u16>());
465
466        let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
467        pulsar
468            .send(&topic, String::from("data"))
469            .await
470            .unwrap()
471            .await
472            .unwrap();
473
474        let mut consumer: Consumer<String, _> = pulsar
475            .consumer()
476            .with_topic(topic)
477            .with_unacked_message_resend_delay(Some(Duration::from_millis(100)))
478            .with_options(ConsumerOptions {
479                initial_position: InitialPosition::Earliest,
480                ..Default::default()
481            })
482            .build()
483            .await
484            .unwrap();
485
486        let _first_receipt = timeout(Duration::from_secs(2), consumer.next())
487            .await
488            .unwrap()
489            .unwrap()
490            .unwrap();
491        let first_received = Instant::now();
492        let second_receipt = timeout(Duration::from_secs(2), consumer.next())
493            .await
494            .unwrap()
495            .unwrap()
496            .unwrap();
497        let redelivery = first_received.elapsed();
498        consumer.ack(&second_receipt).await.unwrap();
499
500        assert!(redelivery < Duration::from_secs(1));
501    }
502
503    #[tokio::test]
504    #[cfg(feature = "tokio-runtime")]
505    async fn batching() {
506        let _ = log::set_logger(&TEST_LOGGER);
507        let _ = log::set_max_level(LevelFilter::Debug);
508
509        let addr = "pulsar://127.0.0.1:6650";
510        let topic = format!("test_batching_{}", rand::random::<u16>());
511
512        let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
513        let mut producer = pulsar
514            .producer()
515            .with_topic(&topic)
516            .with_options(ProducerOptions {
517                batch_size: Some(5),
518                ..Default::default()
519            })
520            .build()
521            .await
522            .unwrap();
523
524        let mut consumer: Consumer<String, _> =
525            pulsar.consumer().with_topic(topic).build().await.unwrap();
526
527        let mut send_receipts = Vec::new();
528        for i in 0..4 {
529            send_receipts.push(producer.send(i.to_string()).await.unwrap());
530        }
531        assert!(timeout(Duration::from_millis(100), consumer.next())
532            .await
533            .is_err());
534
535        send_receipts.push(producer.send(5.to_string()).await.unwrap());
536
537        timeout(Duration::from_millis(100), try_join_all(send_receipts))
538            .await
539            .unwrap()
540            .unwrap();
541
542        let mut count = 0;
543        while let Some(message) = timeout(Duration::from_millis(100), consumer.next())
544            .await
545            .unwrap()
546        {
547            let message = message.unwrap();
548            count += 1;
549            let _ = consumer.ack(&message).await;
550            if count >= 5 {
551                break;
552            }
553        }
554
555        assert_eq!(count, 5);
556        let mut send_receipts = Vec::new();
557        for i in 5..9 {
558            send_receipts.push(producer.send(i.to_string()).await.unwrap());
559        }
560        producer.send_batch().await.unwrap();
561        timeout(Duration::from_millis(100), try_join_all(send_receipts))
562            .await
563            .unwrap()
564            .unwrap();
565        while let Some(message) = timeout(Duration::from_millis(100), consumer.next())
566            .await
567            .unwrap()
568        {
569            let message = message.unwrap();
570            count += 1;
571            let _ = consumer.ack(&message).await;
572            if count >= 9 {
573                break;
574            }
575        }
576        assert_eq!(count, 9);
577    }
578}