[−][src]Crate pulsar
Pure Rust async await client for Apache Pulsar
This is a pure Rust client for Apache Pulsar that does not depend on the C++ Pulsar library. It provides an async/await based API, compatible with Tokio and async-std.
Features:
- URL based (
pulsar://
andpulsar+ssl://
) connections with DNS lookup - multi topic consumers (based on a regex)
- TLS connection
- configurable executor (Tokio or async-std)
- automatic reconnection with exponential back off
- message batching
- compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features)
Examples
Producing
#[macro_use] extern crate serde; use pulsar::{ message::proto, producer, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor, }; #[derive(Serialize, Deserialize)] struct TestData { data: String, } impl SerializeMessage for TestData { fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> { let payload = serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?; Ok(producer::Message { payload, ..Default::default() }) } } #[tokio::main] async fn main() -> Result<(), pulsar::Error> { env_logger::init(); let addr = "pulsar://127.0.0.1:6650"; let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?; let mut producer = pulsar .producer() .with_topic("non-persistent://public/default/test") .with_name("my producer") .with_options(producer::ProducerOptions { schema: Some(proto::Schema { type_: proto::schema::Type::String as i32, ..Default::default() }), ..Default::default() }) .build() .await?; let mut counter = 0usize; loop { producer .send(TestData { data: "data".to_string(), }) .await?; counter += 1; println!("{} messages", counter); tokio::time::delay_for(std::time::Duration::from_millis(2000)).await; } }
Consuming
#[macro_use] extern crate serde; use futures::TryStreamExt; use pulsar::{ message::proto::command_subscribe::SubType, message::Payload, Consumer, DeserializeMessage, Pulsar, TokioExecutor, }; #[derive(Serialize, Deserialize)] struct TestData { data: String, } impl DeserializeMessage for TestData { type Output = Result<TestData, serde_json::Error>; fn deserialize_message(payload: &Payload) -> Self::Output { serde_json::from_slice(&payload.data) } } #[tokio::main] async fn main() -> Result<(), pulsar::Error> { env_logger::init(); let addr = "pulsar://127.0.0.1:6650"; let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?; let mut consumer: Consumer<TestData, _> = pulsar .consumer() .with_topic("test") .with_consumer_name("test_consumer") .with_subscription_type(SubType::Exclusive) .with_subscription("test_subscription") .build() .await?; let mut counter = 0usize; while let Some(msg) = consumer.try_next().await? { consumer.ack(&msg).await?; let data = match msg.deserialize() { Ok(data) => data, Err(e) => { log::error!("could not deserialize message: {:?}", e); break; } }; if data.data.as_str() != "data" { log::error!("Unexpected payload: {}", &data.data); break; } counter += 1; log::info!("got {} messages", counter); } Ok(()) }
Re-exports
pub use consumer::Consumer; |
pub use consumer::ConsumerBuilder; |
pub use consumer::ConsumerOptions; |
pub use error::Error; |
pub use executor::AsyncStdExecutor; |
pub use executor::Executor; |
pub use executor::TokioExecutor; |
pub use message::proto::command_subscribe::SubType; |
pub use producer::MultiTopicProducer; |
pub use producer::Producer; |
pub use producer::ProducerOptions; |
pub use message::Payload; |
pub use message::proto; |
pub use message::proto::CommandSendReceipt; |
Modules
consumer | Topic subscriptions |
error | Error types |
executor | |
message | low level structures used to send and process raw messages |
producer | Message publication |
Structs
Authentication | Authentication parameters |
BackOffOptions | configuration for reconnection exponential back off |
BrokerAddress | holds connection information for a broker |
Pulsar | Pulsar client |
TlsOptions | configuration for TLS connections |
Traits
DeserializeMessage | Helper trait for consumer deserialization |
SerializeMessage | Helper trait for message serialization |