Expand description
§Pure Rust async await client for Apache Pulsar
This is a pure Rust client for Apache Pulsar that does not depend on the C++ Pulsar library. It provides an async/await based API, compatible with Tokio and async-std.
Features:
- URL based (
pulsar://andpulsar+ssl://) connections with DNS lookup - multi topic consumers (based on a regex)
 - TLS connection
 - configurable executor (Tokio or async-std)
 - automatic reconnection with exponential back off
 - message batching
 - compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features)
 
§Examples
Copy this into your project’s Cargo.toml:
[dependencies]
env_logger = "0.9"
pulsar = "4.1.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
log = "0.4.6"
futures = "0.3"§Producing
use pulsar::{
    message::proto, producer, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor,
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}
impl SerializeMessage for TestData {
    fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
        let payload =
            serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
        Ok(producer::Message {
            payload,
            ..Default::default()
        })
    }
}
#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();
    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("non-persistent://public/default/test")
        .with_name("my producer")
        .with_options(producer::ProducerOptions {
            schema: Some(proto::Schema {
                r#type: proto::schema::Type::String as i32,
                ..Default::default()
            }),
            ..Default::default()
        })
        .build()
        .await?;
    let mut counter = 0usize;
    loop {
        producer
            .send(TestData {
                data: "data".to_string(),
            })
            .await?;
        counter += 1;
        println!("{} messages", counter);
        tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
    }
}§Consuming
use futures::TryStreamExt;
use pulsar::{
    message::{proto::command_subscribe::SubType, Payload},
    Consumer, DeserializeMessage, Pulsar, TokioExecutor,
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}
impl DeserializeMessage for TestData {
    type Output = Result<TestData, serde_json::Error>;
    fn deserialize_message(payload: &Payload) -> Self::Output {
        serde_json::from_slice(&payload.data)
    }
}
#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();
    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
    let mut consumer: Consumer<TestData, _> = pulsar
        .consumer()
        .with_topic("test")
        .with_consumer_name("test_consumer")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("test_subscription")
        .build()
        .await?;
    let mut counter = 0usize;
    while let Some(msg) = consumer.try_next().await? {
        consumer.ack(&msg).await?;
        let data = match msg.deserialize() {
            Ok(data) => data,
            Err(e) => {
                log::error!("could not deserialize message: {:?}", e);
                break;
            }
        };
        if data.data.as_str() != "data" {
            log::error!("Unexpected payload: {}", &data.data);
            break;
        }
        counter += 1;
        log::info!("got {} messages", counter);
    }
    Ok(())
}Re-exports§
pub use consumer::Consumer;pub use consumer::ConsumerBuilder;pub use consumer::ConsumerOptions;pub use error::Error;pub use executor::AsyncStdExecutor;pub use executor::Executor;pub use executor::TokioExecutor;pub use message::proto;pub use message::proto::command_subscribe::SubType;pub use message::proto::CommandSendReceipt;pub use message::Payload;pub use producer::MultiTopicProducer;pub use producer::Producer;pub use producer::ProducerOptions;
Modules§
- authentication
 - compression
 - Compression strategy configs
 - consumer
 - Topic subscriptions
 - error
 - Error types
 - executor
 - executor abstraction
 - message
 - low level structures used to send and process raw messages
 - producer
 - Message publication
 - reader
 - routing_
policy  
Structs§
- Authentication
 - Authentication parameters
 - Broker
Address  - holds connection information for a broker
 - Connection
Retry Options  - configuration for reconnection exponential back off
 - Operation
Retry Options  - configuration for Pulsar operation retries
 - Pulsar
 - Pulsar client
 - Pulsar
Builder  - Helper structure to generate a Pulsar client
 - TlsOptions
 - configuration for TLS connections
 
Traits§
- Deserialize
Message  - Helper trait for consumer deserialization
 - Serialize
Message  - Helper trait for message serialization