use super::options::BatchingOptions;
use crate::publisher::actor::BundledMessage;
use crate::publisher::actor::ToDispatcher;
use crate::publisher::builder::PublisherBuilder;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
pub use super::base_publisher::BasePublisher;
#[derive(Debug, Clone)]
pub struct Publisher {
#[cfg_attr(not(test), expect(dead_code))]
pub(crate) batching_options: BatchingOptions,
pub(crate) tx: UnboundedSender<ToDispatcher>,
}
impl Publisher {
pub fn builder<T: Into<String>>(topic: T) -> PublisherBuilder {
PublisherBuilder::new(topic.into())
}
#[must_use = "ignoring the publish result may lead to undetected delivery failures"]
pub fn publish(&self, msg: crate::model::Message) -> crate::publisher::PublishFuture {
let (tx, rx) = tokio::sync::oneshot::channel();
if self
.tx
.send(ToDispatcher::Publish(BundledMessage { msg, tx }))
.is_err()
{
}
crate::publisher::PublishFuture { rx }
}
pub async fn flush(&self) {
let (tx, rx) = oneshot::channel();
if self.tx.send(ToDispatcher::Flush(tx)).is_ok() {
let _ = rx.await;
}
}
pub fn resume_publish<T: std::convert::Into<std::string::String>>(&self, ordering_key: T) {
let _ = self
.tx
.send(ToDispatcher::ResumePublish(ordering_key.into()));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::publisher::builder::PublisherPartialBuilder;
use crate::publisher::client::BasePublisher;
use crate::publisher::constants::*;
use crate::publisher::options::BatchingOptions;
use crate::{
generated::gapic_dataplane::client::Publisher as GapicPublisher,
model::{Message, PublishResponse},
};
use google_cloud_test_macros::tokio_test_no_panics;
use mockall::Sequence;
use rand::{RngExt, distr::Alphanumeric};
use std::error::Error;
use std::time::Duration;
static TOPIC: &str = "my-topic";
mockall::mock! {
#[derive(Debug)]
GapicPublisher {}
impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisher {
async fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> crate::Result<crate::Response<crate::model::PublishResponse>>;
}
}
mockall::mock! {
#[derive(Debug)]
GapicPublisherWithFuture {}
impl crate::generated::gapic_dataplane::stub::Publisher for GapicPublisherWithFuture {
fn publish(&self, req: crate::model::PublishRequest, _options: crate::RequestOptions) -> impl Future<Output=crate::Result<crate::Response<crate::model::PublishResponse>>> + Send;
}
}
fn publish_ok(
req: crate::model::PublishRequest,
_options: crate::RequestOptions,
) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
let ids = req
.messages
.iter()
.map(|m| String::from_utf8(m.data.to_vec()).unwrap());
Ok(crate::Response::from(
PublishResponse::new().set_message_ids(ids),
))
}
fn publish_err(
_req: crate::model::PublishRequest,
_options: crate::RequestOptions,
) -> crate::Result<crate::Response<crate::model::PublishResponse>> {
Err(crate::Error::service(
google_cloud_gax::error::rpc::Status::default()
.set_code(google_cloud_gax::error::rpc::Code::Unknown)
.set_message("unknown error has occurred"),
))
}
#[track_caller]
fn assert_publish_err(got_err: crate::error::PublishError) {
assert!(
matches!(got_err, crate::error::PublishError::Rpc(_)),
"{got_err:?}"
);
let source = got_err
.source()
.and_then(|e| e.downcast_ref::<std::sync::Arc<crate::Error>>())
.expect("send error should contain a source");
assert!(source.status().is_some(), "{got_err:?}");
assert_eq!(
source.status().unwrap().code,
google_cloud_gax::error::rpc::Code::Unknown,
"{got_err:?}"
);
}
fn generate_random_data() -> String {
rand::rng()
.sample_iter(&Alphanumeric)
.take(16)
.map(char::from)
.collect()
}
macro_rules! assert_publishing_is_ok {
($publisher:ident, $($ordering_key:expr),+) => {
$(
let msg = generate_random_data();
let got = $publisher
.publish(
Message::new()
.set_ordering_key($ordering_key)
.set_data(msg.clone()),
)
.await;
assert_eq!(got?, msg);
)+
};
}
macro_rules! assert_publishing_is_paused {
($publisher:ident, $($ordering_key:expr),+) => {
$(
let got_err = $publisher
.publish(
Message::new()
.set_ordering_key($ordering_key)
.set_data(generate_random_data()),
)
.await;
assert!(
matches!(got_err, Err(crate::error::PublishError::OrderingKeyPaused)),
"{got_err:?}"
);
)+
};
}
#[tokio_test_no_panics]
async fn publisher_publish_successfully() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.times(2)
.withf(|req, _o| req.topic == TOPIC)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1_u32)
.build();
let messages = [
Message::new().set_data("hello"),
Message::new().set_data("world"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push((msg, handle));
}
for (id, rx) in handles.into_iter() {
let got = rx.await?;
let id = String::from_utf8(id.data.to_vec())?;
assert_eq!(got, id);
}
Ok(())
}
#[tokio_test_no_panics]
async fn publisher_publish_successfully_with_arc() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.times(2)
.withf(|req, _o| req.topic == TOPIC)
.returning(publish_ok);
let mock_arc = std::sync::Arc::new(mock);
let client = GapicPublisher::from_stub::<MockGapicPublisher>(mock_arc);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1_u32)
.build();
let messages = [
Message::new().set_data("hello"),
Message::new().set_data("world"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push((msg, handle));
}
for (id, rx) in handles.into_iter() {
let got = rx.await?;
let id = String::from_utf8(id.data.to_vec())?;
assert_eq!(got, id);
}
Ok(())
}
#[tokio::test]
async fn publisher_publish_large_message() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_byte_threshold(1_u32)
.build();
assert_publishing_is_ok!(publisher, "");
assert_publishing_is_ok!(publisher, "key");
Ok(())
}
#[tokio::test(start_paused = true)]
async fn worker_handles_forced_shutdown_gracefully() -> anyhow::Result<()> {
let mock = MockGapicPublisher::new();
let client = GapicPublisher::from_stub(mock);
let (publisher, background_task_handle) =
PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(100_u32)
.build_return_handle();
let messages = [
Message::new().set_data("hello"),
Message::new().set_data("world"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg);
handles.push(handle);
}
background_task_handle.abort();
for rx in handles.into_iter() {
rx.await
.expect_err("expected error when background task canceled");
}
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn dropping_publisher_flushes_pending_messages() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(2)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1000_u32)
.set_delay_threshold(Duration::from_secs(60))
.build();
let start = tokio::time::Instant::now();
let messages = [
Message::new().set_data("hello"),
Message::new().set_data("world"),
Message::new().set_data("hello").set_ordering_key("key"),
Message::new().set_data("world").set_ordering_key("key"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push((msg, handle));
}
drop(publisher);
for (id, rx) in handles.into_iter() {
let got = rx.await?;
let id = String::from_utf8(id.data.to_vec())?;
assert_eq!(got, id);
assert_eq!(start.elapsed(), Duration::ZERO);
}
Ok(())
}
#[tokio_test_no_panics]
async fn publisher_handles_publish_errors() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.times(2)
.withf(|req, _o| req.topic == TOPIC)
.returning(publish_err);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1_u32)
.build();
let messages = [
Message::new().set_data("hello"),
Message::new().set_data("world"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push(handle);
}
for rx in handles.into_iter() {
let got = rx.await;
assert!(got.is_err(), "{got:?}");
}
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn flush_sends_pending_messages_immediately() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1000_u32)
.set_delay_threshold(Duration::from_secs(60))
.build();
let start = tokio::time::Instant::now();
let messages = [
Message::new().set_data("hello"),
Message::new().set_data("world"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push((msg, handle));
}
publisher.flush().await;
assert_eq!(start.elapsed(), Duration::ZERO);
let post = publisher.publish(Message::new().set_data("after"));
for (id, rx) in handles.into_iter() {
let got = rx.await?;
let id = String::from_utf8(id.data.to_vec())?;
assert_eq!(got, id);
assert_eq!(start.elapsed(), Duration::ZERO);
}
let got = post.await?;
assert_eq!(got, "after");
assert_eq!(start.elapsed(), Duration::from_secs(60));
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn dropping_handles_does_not_prevent_publishing() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|r, _| {
r.messages.len() == 2
&& r.messages[0].data == "hello"
&& r.messages[1].data == "world"
})
.return_once(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1000_u32)
.set_delay_threshold(Duration::from_secs(60))
.build();
let start = tokio::time::Instant::now();
let messages = [
Message::new().set_data("hello"),
Message::new().set_data("world"),
];
for msg in messages {
let handle = publisher.publish(msg.clone());
drop(handle);
}
publisher.flush().await;
assert_eq!(start.elapsed(), Duration::ZERO);
Ok(())
}
#[tokio::test(start_paused = true)]
async fn flush_with_no_messages_is_noop() -> anyhow::Result<()> {
let mock = MockGapicPublisher::new();
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
let start = tokio::time::Instant::now();
publisher.flush().await;
assert_eq!(start.elapsed(), Duration::ZERO);
Ok(())
}
#[tokio_test_no_panics]
async fn batch_sends_on_message_count_threshold_success() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|r, _| r.messages.len() == 2)
.return_once(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(2_u32)
.set_byte_threshold(MAX_BYTES)
.set_delay_threshold(std::time::Duration::MAX)
.build();
let messages = [
Message::new().set_data("hello"),
Message::new().set_data("world"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push((msg, handle));
}
for (id, rx) in handles.into_iter() {
let got = rx.await?;
let id = String::from_utf8(id.data.to_vec())?;
assert_eq!(got, id);
}
Ok(())
}
#[tokio_test_no_panics]
async fn batch_sends_on_message_count_threshold_error() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|r, _| r.messages.len() == 2)
.return_once(publish_err);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(2_u32)
.set_byte_threshold(MAX_BYTES)
.set_delay_threshold(std::time::Duration::MAX)
.build();
let messages = [
Message::new().set_data("hello"),
Message::new().set_data("world"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push(handle);
}
for rx in handles.into_iter() {
let got = rx.await;
assert!(got.is_err(), "{got:?}");
}
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn batch_sends_on_byte_threshold() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|r, _| r.messages.len() == 1)
.times(2)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let byte_threshold = TOPIC.len() + "hello".len() + "key".len() + 1;
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(MAX_MESSAGES)
.set_byte_threshold(byte_threshold as u32)
.set_delay_threshold(std::time::Duration::MAX)
.build();
let handle = publisher.publish(Message::new().set_data("hello"));
let _handle = publisher.publish(Message::new().set_data("world"));
assert_eq!(handle.await?, "hello");
let handle = publisher.publish(Message::new().set_data("hello").set_ordering_key("key"));
let _handle = publisher.publish(Message::new().set_data("world").set_ordering_key("key"));
assert_eq!(handle.await?, "hello");
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn batch_sends_on_delay_threshold() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _| req.topic == TOPIC)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let delay = std::time::Duration::from_millis(10);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(u32::MAX)
.set_byte_threshold(MAX_BYTES)
.set_delay_threshold(delay)
.build();
for _ in 0..3 {
let start = tokio::time::Instant::now();
let messages = [
Message::new().set_data("hello 0"),
Message::new().set_data("hello 1"),
Message::new()
.set_data("hello 2")
.set_ordering_key("ordering key 1"),
Message::new()
.set_data("hello 3")
.set_ordering_key("ordering key 2"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push((msg, handle));
}
for (id, rx) in handles.into_iter() {
let got = rx.await?;
let id = String::from_utf8(id.data.to_vec())?;
assert_eq!(got, id);
assert_eq!(
start.elapsed(),
delay,
"batch of messages should have sent after {:?}",
delay
)
}
}
Ok(())
}
#[tokio::test(start_paused = true)]
#[allow(clippy::get_first)]
async fn batching_separates_by_ordering_key() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|r, _| {
r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
})
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let message_count_threshold = 2_u32;
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(message_count_threshold)
.set_byte_threshold(MAX_BYTES)
.set_delay_threshold(std::time::Duration::MAX)
.build();
let num_ordering_keys = 3;
let mut messages = Vec::new();
for i in 0..(2 * message_count_threshold * num_ordering_keys) {
messages.push(
Message::new()
.set_data(format!("test message {}", i))
.set_ordering_key(format!("ordering key: {}", i % num_ordering_keys)),
);
}
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push((msg, handle));
}
for (id, rx) in handles.into_iter() {
let got = rx.await?;
let id = String::from_utf8(id.data.to_vec())?;
assert_eq!(got, id);
}
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
#[allow(clippy::get_first)]
async fn batching_handles_empty_ordering_key() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|r, _| {
r.messages.len() == 2 && r.messages[0].ordering_key == r.messages[1].ordering_key
})
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(2_u32)
.set_byte_threshold(MAX_BYTES)
.set_delay_threshold(std::time::Duration::MAX)
.build();
let messages = [
Message::new().set_data("hello 1"),
Message::new().set_data("hello 2").set_ordering_key(""),
Message::new()
.set_data("hello 3")
.set_ordering_key("ordering key :1"),
Message::new()
.set_data("hello 4")
.set_ordering_key("ordering key :1"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push((msg, handle));
}
for (id, rx) in handles.into_iter() {
let got = rx.await?;
let id = String::from_utf8(id.data.to_vec())?;
assert_eq!(got, id);
}
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
#[allow(clippy::get_first)]
async fn ordering_key_limits_to_one_outstanding_batch() -> anyhow::Result<()> {
let mut seq = Sequence::new();
let mut mock = MockGapicPublisherWithFuture::new();
mock.expect_publish()
.times(1)
.in_sequence(&mut seq)
.withf(|r, _| r.messages.len() == 1)
.returning({
|r, o| {
Box::pin(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
publish_ok(r, o)
})
}
});
mock.expect_publish()
.times(1)
.in_sequence(&mut seq)
.withf(|r, _| r.messages.len() == 1)
.returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1_u32)
.set_byte_threshold(MAX_BYTES)
.set_delay_threshold(std::time::Duration::MAX)
.build();
let messages = [
Message::new()
.set_data("hello 1")
.set_ordering_key("ordering key"),
Message::new()
.set_data("hello 2")
.set_ordering_key("ordering key"),
];
let start = tokio::time::Instant::now();
let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
assert_eq!(msg2_handle.await?, "hello 2");
assert_eq!(
start.elapsed(),
Duration::from_millis(10),
"the second batch of messages should have sent after the first which is has been delayed by {:?}",
Duration::from_millis(10)
);
assert_eq!(msg1_handle.await?, "hello 1");
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
#[allow(clippy::get_first)]
async fn empty_ordering_key_allows_concurrent_batches() -> anyhow::Result<()> {
let mut seq = Sequence::new();
let mut mock = MockGapicPublisherWithFuture::new();
mock.expect_publish()
.times(1)
.in_sequence(&mut seq)
.withf(|r, _| r.messages.len() == 1)
.returning(|r, o| {
Box::pin(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
publish_ok(r, o)
})
});
mock.expect_publish()
.times(1)
.in_sequence(&mut seq)
.withf(|r, _| r.topic == TOPIC && r.messages.len() == 1)
.returning(|r, o| Box::pin(async move { publish_ok(r, o) }));
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1_u32)
.set_byte_threshold(MAX_BYTES)
.set_delay_threshold(std::time::Duration::MAX)
.build();
let messages = [
Message::new().set_data("hello 1").set_ordering_key(""),
Message::new().set_data("hello 2").set_ordering_key(""),
];
let start = tokio::time::Instant::now();
let msg1_handle = publisher.publish(messages.get(0).unwrap().clone());
let msg2_handle = publisher.publish(messages.get(1).unwrap().clone());
assert_eq!(msg2_handle.await?, "hello 2");
assert_eq!(
start.elapsed(),
Duration::from_millis(0),
"the second batch of messages should have sent without any delay"
);
assert_eq!(msg1_handle.await?, "hello 1");
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn ordering_key_error_pauses_publisher() -> anyhow::Result<()> {
let mut seq = Sequence::new();
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(1)
.in_sequence(&mut seq)
.returning(publish_err);
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(2)
.in_sequence(&mut seq)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1_u32)
.build();
let key = "ordering_key";
let msg_0_handle =
publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
let msg_1_handle =
publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
let mut got_err = msg_0_handle.await.unwrap_err();
assert_publish_err(got_err);
got_err = msg_1_handle.await.unwrap_err();
assert!(
matches!(got_err, crate::error::PublishError::OrderingKeyPaused),
"{got_err:?}"
);
for _ in 0..3 {
assert_publishing_is_paused!(publisher, key);
}
assert_publishing_is_ok!(publisher, "", "without_error");
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn batch_error_pauses_ordering_key() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.times(1)
.withf(|r, _| r.topic == TOPIC && r.messages.len() == 2)
.returning(publish_err);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(2_u32)
.build();
let key = "ordering_key";
let msg_0_handle =
publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
let msg_1_handle =
publisher.publish(Message::new().set_ordering_key(key).set_data("msg 1"));
let mut got_err = msg_0_handle.await.unwrap_err();
assert_publish_err(got_err);
got_err = msg_1_handle.await.unwrap_err();
assert_publish_err(got_err);
assert_publishing_is_paused!(publisher, key);
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn flush_on_paused_ordering_key_returns_error() -> anyhow::Result<()> {
let mut seq = Sequence::new();
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(1)
.in_sequence(&mut seq)
.returning(publish_err);
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(2)
.in_sequence(&mut seq)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
let key = "ordering_key";
let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
publisher.flush().await;
let got_err = handle.await.unwrap_err();
assert_publish_err(got_err);
assert_publishing_is_paused!(publisher, key);
assert_publishing_is_ok!(publisher, "", "without_error");
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn resuming_non_paused_ordering_key_is_noop() -> anyhow::Result<()> {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(4)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
publisher.resume_publish("");
assert_publishing_is_ok!(publisher, "");
publisher.resume_publish("");
assert_publishing_is_ok!(publisher, "");
let key = "without_error";
publisher.resume_publish(key);
assert_publishing_is_ok!(publisher, key);
publisher.resume_publish(key);
assert_publishing_is_ok!(publisher, key);
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn resuming_paused_ordering_key_allows_publishing() -> anyhow::Result<()> {
let mut seq = Sequence::new();
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(1)
.in_sequence(&mut seq)
.returning(publish_err);
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(3)
.in_sequence(&mut seq)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
let key = "ordering_key";
let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
let got_err = handle.await.unwrap_err();
assert_publish_err(got_err);
assert_publishing_is_paused!(publisher, key);
publisher.resume_publish(key);
assert_publishing_is_ok!(publisher, key);
assert_publishing_is_ok!(publisher, "", "without_error");
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn resuming_ordering_key_twice_is_safe() -> anyhow::Result<()> {
let mut seq = Sequence::new();
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.in_sequence(&mut seq)
.times(1)
.returning(publish_err);
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.in_sequence(&mut seq)
.return_once(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
let key = "ordering_key";
let handle = publisher.publish(Message::new().set_ordering_key(key).set_data("msg 0"));
publisher.flush().await;
let got_err = handle.await.unwrap_err();
assert_publish_err(got_err);
assert_publishing_is_paused!(publisher, key);
publisher.resume_publish(key);
publisher.resume_publish(key);
assert_publishing_is_ok!(publisher, key);
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn resuming_one_ordering_key_does_not_resume_others() -> anyhow::Result<()> {
let mut seq = Sequence::new();
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(2)
.in_sequence(&mut seq)
.returning(publish_err);
mock.expect_publish()
.withf(|req, _o| req.topic == TOPIC)
.times(1)
.in_sequence(&mut seq)
.returning(publish_ok);
let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()).build();
let key_0 = "ordering_key_0";
let key_1 = "ordering_key_1";
let handle_0 = publisher.publish(Message::new().set_ordering_key(key_0).set_data("msg 0"));
let handle_1 = publisher.publish(Message::new().set_ordering_key(key_1).set_data("msg 1"));
publisher.flush().await;
let mut got_err = handle_0.await.unwrap_err();
assert_publish_err(got_err);
got_err = handle_1.await.unwrap_err();
assert_publish_err(got_err);
assert_publishing_is_paused!(publisher, key_0, key_1);
publisher.resume_publish(key_0);
assert_publishing_is_ok!(publisher, key_0);
assert_publishing_is_paused!(publisher, key_1);
Ok(())
}
#[tokio::test]
async fn publisher_builder_clamps_batching_options() -> anyhow::Result<()> {
let oversized_options = BatchingOptions::new()
.set_delay_threshold(MAX_DELAY + Duration::from_secs(1))
.set_message_count_threshold(MAX_MESSAGES + 1)
.set_byte_threshold(MAX_BYTES + 1);
let publishers = vec![
BasePublisher::builder()
.build()
.await?
.publisher("projects/my-project/topics/my-topic")
.set_delay_threshold(oversized_options.delay_threshold)
.set_message_count_threshold(oversized_options.message_count_threshold)
.set_byte_threshold(oversized_options.byte_threshold)
.build(),
Publisher::builder("projects/my-project/topics/my-topic".to_string())
.set_delay_threshold(oversized_options.delay_threshold)
.set_message_count_threshold(oversized_options.message_count_threshold)
.set_byte_threshold(oversized_options.byte_threshold)
.build()
.await?,
];
for publisher in publishers {
let got = publisher.batching_options;
assert_eq!(got.delay_threshold, MAX_DELAY);
assert_eq!(got.message_count_threshold, MAX_MESSAGES);
assert_eq!(got.byte_threshold, MAX_BYTES);
}
let normal_options = BatchingOptions::new()
.set_delay_threshold(Duration::from_secs(10))
.set_message_count_threshold(10_u32)
.set_byte_threshold(100_u32);
let publishers = vec![
BasePublisher::builder()
.build()
.await?
.publisher("projects/my-project/topics/my-topic")
.set_delay_threshold(normal_options.delay_threshold)
.set_message_count_threshold(normal_options.message_count_threshold)
.set_byte_threshold(normal_options.byte_threshold)
.build(),
Publisher::builder("projects/my-project/topics/my-topic".to_string())
.set_delay_threshold(normal_options.delay_threshold)
.set_message_count_threshold(normal_options.message_count_threshold)
.set_byte_threshold(normal_options.byte_threshold)
.build()
.await?,
];
for publisher in publishers {
let got = publisher.batching_options;
assert_eq!(got.delay_threshold, normal_options.delay_threshold);
assert_eq!(
got.message_count_threshold,
normal_options.message_count_threshold
);
assert_eq!(got.byte_threshold, normal_options.byte_threshold);
}
Ok(())
}
}