[][src]Crate pulsar

Pure Rust async await client for Apache Pulsar

This is a pure Rust client for Apache Pulsar that does not depend on the C++ Pulsar library. It provides an async/await based API, compatible with Tokio and async-std.

Features:

  • URL based (pulsar:// and pulsar+ssl://) connections with DNS lookup
  • multi topic consumers (based on a regex)
  • TLS connection
  • configurable executor (Tokio or async-std)
  • automatic reconnection with exponential back off
  • message batching
  • compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features)

Examples

Producing

#[macro_use]
extern crate serde;
use pulsar::{
    message::proto, producer, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor,
};

#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}

impl SerializeMessage for TestData {
    fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
        let payload = serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
        Ok(producer::Message {
            payload,
            ..Default::default()
        })
    }
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("non-persistent://public/default/test")
        .with_name("my producer")
        .with_options(producer::ProducerOptions {
            schema: Some(proto::Schema {
                type_: proto::schema::Type::String as i32,
                ..Default::default()
            }),
            ..Default::default()
        })
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(TestData {
                data: "data".to_string(),
            })
            .await?;

        counter += 1;
        println!("{} messages", counter);
        tokio::time::delay_for(std::time::Duration::from_millis(2000)).await;
    }
}

Consuming

#[macro_use]
extern crate serde;
use futures::TryStreamExt;
use pulsar::{
    message::proto::command_subscribe::SubType, message::Payload, Consumer, DeserializeMessage,
    Pulsar, TokioExecutor,
};

#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}

impl DeserializeMessage for TestData {
    type Output = Result<TestData, serde_json::Error>;

    fn deserialize_message(payload: &Payload) -> Self::Output {
        serde_json::from_slice(&payload.data)
    }
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;

    let mut consumer: Consumer<TestData, _> = pulsar
        .consumer()
        .with_topic("test")
        .with_consumer_name("test_consumer")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("test_subscription")
        .build()
        .await?;

    let mut counter = 0usize;
    while let Some(msg) = consumer.try_next().await? {
        consumer.ack(&msg).await?;
        let data = match msg.deserialize() {
            Ok(data) => data,
            Err(e) => {
                log::error!("could not deserialize message: {:?}", e);
                break;
            }
        };

        if data.data.as_str() != "data" {
            log::error!("Unexpected payload: {}", &data.data);
            break;
        }
        counter += 1;
        log::info!("got {} messages", counter);
    }

    Ok(())
}

Re-exports

pub use consumer::Consumer;
pub use consumer::ConsumerBuilder;
pub use consumer::ConsumerOptions;
pub use error::Error;
pub use executor::AsyncStdExecutor;
pub use executor::Executor;
pub use executor::TokioExecutor;
pub use message::proto::command_subscribe::SubType;
pub use producer::MultiTopicProducer;
pub use producer::Producer;
pub use producer::ProducerOptions;
pub use message::Payload;
pub use message::proto;
pub use message::proto::CommandSendReceipt;

Modules

consumer

Topic subscriptions

error

Error types

executor
message

low level structures used to send and process raw messages

producer

Message publication

Structs

Authentication

Authentication parameters

BackOffOptions

configuration for reconnection exponential back off

BrokerAddress

holds connection information for a broker

Pulsar

Pulsar client

TlsOptions

configuration for TLS connections

Traits

DeserializeMessage

Helper trait for consumer deserialization

SerializeMessage

Helper trait for message serialization