Crate pulsar

Source
Expand description

§Pure Rust async await client for Apache Pulsar

This is a pure Rust client for Apache Pulsar that does not depend on the C++ Pulsar library. It provides an async/await based API, compatible with Tokio and async-std.

Features:

  • URL based (pulsar:// and pulsar+ssl://) connections with DNS lookup
  • multi topic consumers (based on a regex)
  • TLS connection
  • configurable executor (Tokio or async-std)
  • automatic reconnection with exponential back off
  • message batching
  • compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features)

§Examples

Copy this into your project’s Cargo.toml:

[dependencies]
env_logger = "0.9"
pulsar = "4.1.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
log = "0.4.6"
futures = "0.3"

§Producing

use pulsar::{
    message::proto, producer, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor,
};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}

impl SerializeMessage for TestData {
    fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
        let payload =
            serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
        Ok(producer::Message {
            payload,
            ..Default::default()
        })
    }
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("non-persistent://public/default/test")
        .with_name("my producer")
        .with_options(producer::ProducerOptions {
            schema: Some(proto::Schema {
                r#type: proto::schema::Type::String as i32,
                ..Default::default()
            }),
            ..Default::default()
        })
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(TestData {
                data: "data".to_string(),
            })
            .await?;

        counter += 1;
        println!("{} messages", counter);
        tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
    }
}

§Consuming

use futures::TryStreamExt;
use pulsar::{
    message::{proto::command_subscribe::SubType, Payload},
    Consumer, DeserializeMessage, Pulsar, TokioExecutor,
};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}

impl DeserializeMessage for TestData {
    type Output = Result<TestData, serde_json::Error>;

    fn deserialize_message(payload: &Payload) -> Self::Output {
        serde_json::from_slice(&payload.data)
    }
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;

    let mut consumer: Consumer<TestData, _> = pulsar
        .consumer()
        .with_topic("test")
        .with_consumer_name("test_consumer")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("test_subscription")
        .build()
        .await?;

    let mut counter = 0usize;
    while let Some(msg) = consumer.try_next().await? {
        consumer.ack(&msg).await?;
        let data = match msg.deserialize() {
            Ok(data) => data,
            Err(e) => {
                log::error!("could not deserialize message: {:?}", e);
                break;
            }
        };

        if data.data.as_str() != "data" {
            log::error!("Unexpected payload: {}", &data.data);
            break;
        }
        counter += 1;
        log::info!("got {} messages", counter);
    }

    Ok(())
}

Re-exports§

pub use consumer::Consumer;
pub use consumer::ConsumerBuilder;
pub use consumer::ConsumerOptions;
pub use error::Error;
pub use executor::AsyncStdExecutor;
pub use executor::Executor;
pub use executor::TokioExecutor;
pub use message::proto;
pub use message::proto::command_subscribe::SubType;
pub use message::proto::CommandSendReceipt;
pub use message::Payload;
pub use producer::MultiTopicProducer;
pub use producer::Producer;
pub use producer::ProducerOptions;

Modules§

authentication
compression
Compression strategy configs
consumer
Topic subscriptions
error
Error types
executor
executor abstraction
message
low level structures used to send and process raw messages
producer
Message publication
reader

Structs§

Authentication
Authentication parameters
BrokerAddress
holds connection information for a broker
ConnectionRetryOptions
configuration for reconnection exponential back off
OperationRetryOptions
configuration for Pulsar operation retries
Pulsar
Pulsar client
PulsarBuilder
Helper structure to generate a Pulsar client
TlsOptions
configuration for TLS connections

Traits§

DeserializeMessage
Helper trait for consumer deserialization
SerializeMessage
Helper trait for message serialization