1#![allow(clippy::too_many_arguments)]
149#![allow(clippy::large_enum_variant)]
150extern crate futures;
151#[macro_use]
152extern crate log;
153extern crate nom;
154extern crate prost_derive;
155
156#[cfg(test)]
157#[macro_use]
158extern crate serde;
159
160pub use client::{DeserializeMessage, Pulsar, PulsarBuilder, SerializeMessage};
161pub use connection::Authentication;
162pub use connection_manager::{
163 BrokerAddress, ConnectionRetryOptions, OperationRetryOptions, TlsOptions,
164};
165pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions};
166pub use error::Error;
167#[cfg(feature = "async-std-runtime")]
168pub use executor::AsyncStdExecutor;
169pub use executor::Executor;
170#[cfg(feature = "tokio-runtime")]
171pub use executor::TokioExecutor;
172pub use message::proto::command_subscribe::SubType;
173pub use message::{
174 proto::{self, CommandSendReceipt},
175 Payload,
176};
177pub use producer::{MultiTopicProducer, Producer, ProducerOptions};
178
179mod client;
180mod connection;
181mod connection_manager;
182pub mod consumer;
183pub mod error;
184pub mod executor;
185pub mod message;
186pub mod producer;
187pub mod reader;
188pub mod authentication;
189mod service_discovery;
190
191#[cfg(test)]
192mod tests {
193 use futures::{future::try_join_all, StreamExt};
194 use log::{LevelFilter, Metadata, Record};
195 use std::collections::BTreeSet;
196 use std::time::{Duration, Instant};
197
198 #[cfg(feature = "tokio-runtime")]
199 use tokio::time::timeout;
200
201 #[cfg(feature = "tokio-runtime")]
202 use crate::executor::TokioExecutor;
203
204 use crate::client::SerializeMessage;
205 use crate::consumer::{InitialPosition, Message};
206 use crate::message::proto::command_subscribe::SubType;
207 use crate::message::Payload;
208 use crate::Error as PulsarError;
209
210 use super::*;
211
212 #[derive(Debug, Serialize, Deserialize)]
213 struct TestData {
214 pub id: u64,
215 pub data: String,
216 }
217
218 impl<'a> SerializeMessage for &'a TestData {
219 fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
220 let payload =
221 serde_json::to_vec(input).map_err(|e| PulsarError::Custom(e.to_string()))?;
222 Ok(producer::Message {
223 payload,
224 ..Default::default()
225 })
226 }
227 }
228
229 impl DeserializeMessage for TestData {
230 type Output = Result<TestData, serde_json::Error>;
231
232 fn deserialize_message(payload: &Payload) -> Self::Output {
233 serde_json::from_slice(&payload.data)
234 }
235 }
236
237 #[derive(Debug)]
238 enum Error {
239 Pulsar(PulsarError),
240 Timeout(std::io::Error),
241 Serde(serde_json::Error),
242 Utf8(std::string::FromUtf8Error),
243 }
244
245 impl From<std::io::Error> for Error {
246 fn from(e: std::io::Error) -> Self {
247 Error::Timeout(e)
248 }
249 }
250
251 impl From<PulsarError> for Error {
252 fn from(e: PulsarError) -> Self {
253 Error::Pulsar(e)
254 }
255 }
256
257 impl From<serde_json::Error> for Error {
258 fn from(e: serde_json::Error) -> Self {
259 Error::Serde(e)
260 }
261 }
262
263 impl From<std::string::FromUtf8Error> for Error {
264 fn from(err: std::string::FromUtf8Error) -> Self {
265 Error::Utf8(err)
266 }
267 }
268
269 impl std::fmt::Display for Error {
270 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
271 match self {
272 Error::Pulsar(e) => write!(f, "{}", e),
273 Error::Timeout(e) => write!(f, "{}", e),
274 Error::Serde(e) => write!(f, "{}", e),
275 Error::Utf8(e) => write!(f, "{}", e),
276 }
277 }
278 }
279
280 pub struct SimpleLogger {
281 pub tag: &'static str,
282 }
283 impl log::Log for SimpleLogger {
284 fn enabled(&self, _metadata: &Metadata) -> bool {
285 true
287 }
288
289 fn log(&self, record: &Record) {
290 if self.enabled(record.metadata()) {
291 println!(
292 "{} {} {}\t{}\t{}",
293 chrono::Utc::now(),
294 self.tag,
295 record.level(),
296 record.module_path().unwrap(),
297 record.args()
298 );
299 }
300 }
301 fn flush(&self) {}
302 }
303
304 pub static TEST_LOGGER: SimpleLogger = SimpleLogger { tag: "" };
305
306 #[tokio::test]
307 #[cfg(feature = "tokio-runtime")]
308 async fn round_trip() {
309 let _ = log::set_logger(&TEST_LOGGER);
310 let _ = log::set_max_level(LevelFilter::Debug);
311
312 let addr = "pulsar://127.0.0.1:6650";
313 let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
314
315 let topic = format!("test_{}", rand::random::<u16>());
317
318 let mut producer = pulsar.producer().with_topic(&topic).build().await.unwrap();
319 info!("producer created");
320
321 let message_ids: BTreeSet<u64> = (0..100).collect();
322
323 info!("will send message");
324 let mut sends = Vec::new();
325 for &id in &message_ids {
326 let message = TestData {
327 data: "data".to_string(),
328 id,
329 };
330 sends.push(producer.send(&message).await.unwrap());
331 }
332 try_join_all(sends).await.unwrap();
333
334 info!("sent");
335
336 let mut consumer: Consumer<TestData, _> = pulsar
337 .consumer()
338 .with_topic(&topic)
339 .with_consumer_name("test_consumer")
340 .with_subscription_type(SubType::Exclusive)
341 .with_subscription("test_subscription")
342 .with_options(ConsumerOptions {
343 initial_position: InitialPosition::Earliest,
344 ..Default::default()
345 })
346 .build()
347 .await
348 .unwrap();
349
350 info!("consumer created");
351
352 let topics = consumer.topics();
353 debug!("consumer connected to {:?}", topics);
354 assert_eq!(topics.len(), 1);
355 assert!(topics[0].ends_with(&topic));
356
357 let mut received = BTreeSet::new();
358 while let Ok(Some(msg)) = timeout(Duration::from_secs(10), consumer.next()).await {
359 let msg: Message<TestData> = msg.unwrap();
360 info!("id: {:?}", msg.message_id());
361 received.insert(msg.deserialize().unwrap().id);
362 consumer.ack(&msg).await.unwrap();
363 if received.len() == message_ids.len() {
364 break;
365 }
366 }
367 assert_eq!(received.len(), message_ids.len());
368 assert_eq!(received, message_ids);
369 }
370
371 #[tokio::test]
372 #[cfg(feature = "tokio-runtime")]
373 async fn unsized_data() {
374 let _ = log::set_logger(&TEST_LOGGER);
375 let _ = log::set_max_level(LevelFilter::Debug);
376
377 let addr = "pulsar://127.0.0.1:6650";
378 let test_id: u16 = rand::random();
379 let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
380
381 {
383 let topic = format!("test_unsized_data_str_{}", test_id);
384 let send_data = "some unsized data";
385
386 pulsar
387 .send(&topic, send_data.to_string())
388 .await
389 .unwrap()
390 .await
391 .unwrap();
392
393 let mut consumer = pulsar
394 .consumer()
395 .with_topic(&topic)
396 .with_subscription_type(SubType::Exclusive)
397 .with_subscription("test_subscription")
398 .with_options(ConsumerOptions {
399 initial_position: InitialPosition::Earliest,
400 ..Default::default()
401 })
402 .build::<String>()
403 .await
404 .unwrap();
405
406 let msg = timeout(Duration::from_secs(1), consumer.next())
407 .await
408 .unwrap()
409 .unwrap()
410 .unwrap();
411 consumer.ack(&msg).await.unwrap();
412
413 let data = msg.deserialize().unwrap();
414 if data.as_str() != send_data {
415 panic!("Unexpected payload in &str test: {}", &data);
416 }
417 }
418
419 {
421 let topic = format!("test_unsized_data_bytes_{}", test_id);
422 let send_data: &[u8] = &[0, 1, 2, 3];
423
424 pulsar
425 .send(&topic, send_data.to_vec())
426 .await
427 .unwrap()
428 .await
429 .unwrap();
430
431 let mut consumer = pulsar
432 .consumer()
433 .with_topic(&topic)
434 .with_subscription_type(SubType::Exclusive)
435 .with_subscription("test_subscription")
436 .with_options(ConsumerOptions {
437 initial_position: InitialPosition::Earliest,
438 ..Default::default()
439 })
440 .build::<Vec<u8>>()
441 .await
442 .unwrap();
443
444 let msg: Message<Vec<u8>> = timeout(Duration::from_secs(1), consumer.next())
445 .await
446 .unwrap()
447 .unwrap()
448 .unwrap();
449 consumer.ack(&msg).await.unwrap();
450 let data = msg.deserialize();
451 if data.as_slice() != send_data {
452 panic!("Unexpected payload in &[u8] test: {:?}", &data);
453 }
454 }
455 }
456
457 #[tokio::test]
458 #[cfg(feature = "tokio-runtime")]
459 async fn redelivery() {
460 let _ = log::set_logger(&TEST_LOGGER);
461 let _ = log::set_max_level(LevelFilter::Debug);
462
463 let addr = "pulsar://127.0.0.1:6650";
464 let topic = format!("test_redelivery_{}", rand::random::<u16>());
465
466 let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
467 pulsar
468 .send(&topic, String::from("data"))
469 .await
470 .unwrap()
471 .await
472 .unwrap();
473
474 let mut consumer: Consumer<String, _> = pulsar
475 .consumer()
476 .with_topic(topic)
477 .with_unacked_message_resend_delay(Some(Duration::from_millis(100)))
478 .with_options(ConsumerOptions {
479 initial_position: InitialPosition::Earliest,
480 ..Default::default()
481 })
482 .build()
483 .await
484 .unwrap();
485
486 let _first_receipt = timeout(Duration::from_secs(2), consumer.next())
487 .await
488 .unwrap()
489 .unwrap()
490 .unwrap();
491 let first_received = Instant::now();
492 let second_receipt = timeout(Duration::from_secs(2), consumer.next())
493 .await
494 .unwrap()
495 .unwrap()
496 .unwrap();
497 let redelivery = first_received.elapsed();
498 consumer.ack(&second_receipt).await.unwrap();
499
500 assert!(redelivery < Duration::from_secs(1));
501 }
502
503 #[tokio::test]
504 #[cfg(feature = "tokio-runtime")]
505 async fn batching() {
506 let _ = log::set_logger(&TEST_LOGGER);
507 let _ = log::set_max_level(LevelFilter::Debug);
508
509 let addr = "pulsar://127.0.0.1:6650";
510 let topic = format!("test_batching_{}", rand::random::<u16>());
511
512 let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
513 let mut producer = pulsar
514 .producer()
515 .with_topic(&topic)
516 .with_options(ProducerOptions {
517 batch_size: Some(5),
518 ..Default::default()
519 })
520 .build()
521 .await
522 .unwrap();
523
524 let mut consumer: Consumer<String, _> =
525 pulsar.consumer().with_topic(topic).build().await.unwrap();
526
527 let mut send_receipts = Vec::new();
528 for i in 0..4 {
529 send_receipts.push(producer.send(i.to_string()).await.unwrap());
530 }
531 assert!(timeout(Duration::from_millis(100), consumer.next())
532 .await
533 .is_err());
534
535 send_receipts.push(producer.send(5.to_string()).await.unwrap());
536
537 timeout(Duration::from_millis(100), try_join_all(send_receipts))
538 .await
539 .unwrap()
540 .unwrap();
541
542 let mut count = 0;
543 while let Some(message) = timeout(Duration::from_millis(100), consumer.next())
544 .await
545 .unwrap()
546 {
547 let message = message.unwrap();
548 count += 1;
549 let _ = consumer.ack(&message).await;
550 if count >= 5 {
551 break;
552 }
553 }
554
555 assert_eq!(count, 5);
556 let mut send_receipts = Vec::new();
557 for i in 5..9 {
558 send_receipts.push(producer.send(i.to_string()).await.unwrap());
559 }
560 producer.send_batch().await.unwrap();
561 timeout(Duration::from_millis(100), try_join_all(send_receipts))
562 .await
563 .unwrap()
564 .unwrap();
565 while let Some(message) = timeout(Duration::from_millis(100), consumer.next())
566 .await
567 .unwrap()
568 {
569 let message = message.unwrap();
570 count += 1;
571 let _ = consumer.ack(&message).await;
572 if count >= 9 {
573 break;
574 }
575 }
576 assert_eq!(count, 9);
577 }
578}