Expand description
§Pure Rust async await client for Apache Pulsar
This is a pure Rust client for Apache Pulsar that does not depend on the C++ Pulsar library. It provides an async/await based API, compatible with Tokio and async-std.
Features:
- URL based (
pulsar://
andpulsar+ssl://
) connections with DNS lookup - multi topic consumers (based on a regex)
- TLS connection
- configurable executor (Tokio or async-std)
- automatic reconnection with exponential back off
- message batching
- compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features)
§Examples
Copy this into your project’s Cargo.toml:
[dependencies]
env_logger = "0.9"
pulsar = "4.1.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
log = "0.4.6"
futures = "0.3"
§Producing
use pulsar::{
message::proto, producer, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor,
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct TestData {
data: String,
}
impl SerializeMessage for TestData {
fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
let payload =
serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
Ok(producer::Message {
payload,
..Default::default()
})
}
}
#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
env_logger::init();
let addr = "pulsar://127.0.0.1:6650";
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
let mut producer = pulsar
.producer()
.with_topic("non-persistent://public/default/test")
.with_name("my producer")
.with_options(producer::ProducerOptions {
schema: Some(proto::Schema {
r#type: proto::schema::Type::String as i32,
..Default::default()
}),
..Default::default()
})
.build()
.await?;
let mut counter = 0usize;
loop {
producer
.send(TestData {
data: "data".to_string(),
})
.await?;
counter += 1;
println!("{} messages", counter);
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
}
}
§Consuming
use futures::TryStreamExt;
use pulsar::{
message::{proto::command_subscribe::SubType, Payload},
Consumer, DeserializeMessage, Pulsar, TokioExecutor,
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct TestData {
data: String,
}
impl DeserializeMessage for TestData {
type Output = Result<TestData, serde_json::Error>;
fn deserialize_message(payload: &Payload) -> Self::Output {
serde_json::from_slice(&payload.data)
}
}
#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
env_logger::init();
let addr = "pulsar://127.0.0.1:6650";
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
let mut consumer: Consumer<TestData, _> = pulsar
.consumer()
.with_topic("test")
.with_consumer_name("test_consumer")
.with_subscription_type(SubType::Exclusive)
.with_subscription("test_subscription")
.build()
.await?;
let mut counter = 0usize;
while let Some(msg) = consumer.try_next().await? {
consumer.ack(&msg).await?;
let data = match msg.deserialize() {
Ok(data) => data,
Err(e) => {
log::error!("could not deserialize message: {:?}", e);
break;
}
};
if data.data.as_str() != "data" {
log::error!("Unexpected payload: {}", &data.data);
break;
}
counter += 1;
log::info!("got {} messages", counter);
}
Ok(())
}
Re-exports§
pub use consumer::Consumer;
pub use consumer::ConsumerBuilder;
pub use consumer::ConsumerOptions;
pub use error::Error;
pub use executor::AsyncStdExecutor;
pub use executor::Executor;
pub use executor::TokioExecutor;
pub use message::proto;
pub use message::proto::command_subscribe::SubType;
pub use message::proto::CommandSendReceipt;
pub use message::Payload;
pub use producer::MultiTopicProducer;
pub use producer::Producer;
pub use producer::ProducerOptions;
Modules§
- authentication
- compression
- Compression strategy configs
- consumer
- Topic subscriptions
- error
- Error types
- executor
- executor abstraction
- message
- low level structures used to send and process raw messages
- producer
- Message publication
- reader
Structs§
- Authentication
- Authentication parameters
- Broker
Address - holds connection information for a broker
- Connection
Retry Options - configuration for reconnection exponential back off
- Operation
Retry Options - configuration for Pulsar operation retries
- Pulsar
- Pulsar client
- Pulsar
Builder - Helper structure to generate a Pulsar client
- TlsOptions
- configuration for TLS connections
Traits§
- Deserialize
Message - Helper trait for consumer deserialization
- Serialize
Message - Helper trait for message serialization