#![cfg(all(feature = "google", feature = "protobuf"))]
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
use crate::{
googlepubsub::{
retry_policy::{RetryOperation, RetryPolicy},
AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, PubSubError, PublishError,
StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName, TopicConfig, TopicName,
},
message,
validators::{
prost::{ExactSchemaMatcher, SchemaMismatchError},
ProstDecodeError, ProstDecoder, ProstValidator, ProstValidatorError,
},
Consumer, DecodableMessage, EncodableMessage, Headers, Publisher, Topic, ValidatedMessage,
};
use futures_util::{pin_mut, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use std::{
sync::mpsc,
task::{Context, Poll},
};
use ya_gcp::pubsub::emulator::Emulator;
const SCHEMA: &str = "test-schema";
const TOPIC: &str = "test-topic";
#[derive(Clone, PartialEq, Eq, prost::Message)]
struct TestMessage {
#[prost(string, tag = "1")]
payload: String,
}
impl EncodableMessage for TestMessage {
type Error = ProstValidatorError;
type Validator = ProstValidator;
fn topic(&self) -> Topic {
TOPIC.into()
}
fn encode(&self, validator: &Self::Validator) -> Result<ValidatedMessage, Self::Error> {
validator.validate(
uuid::Uuid::nil(),
std::time::SystemTime::UNIX_EPOCH,
SCHEMA,
Headers::from([(String::from("key"), String::from("value"))]),
self,
)
}
}
impl DecodableMessage for TestMessage {
type Decoder = ProstDecoder<ExactSchemaMatcher<TestMessage>>;
type Error = ProstDecodeError<SchemaMismatchError>;
fn decode(msg: ValidatedMessage, validator: &Self::Decoder) -> Result<Self, Self::Error> {
validator.decode(msg)
}
}
#[test]
fn decode_with_headers() -> Result<(), BoxError> {
let orig_message = TestMessage {
payload: "foobar".into(),
};
let encoded = orig_message.encode(&ProstValidator::new())?;
let decoded = message::ValidatedMessage::<TestMessage>::decode(
encoded,
&ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)),
)?;
let headers = Headers::from([(String::from("key"), String::from("value"))]);
assert_eq!(decoded.headers(), &headers);
Ok(())
}
#[tokio::test]
#[ignore = "pubsub emulator is finicky, run this test manually"]
async fn roundtrip_protobuf() -> Result<(), BoxError> {
let project_name = "test-project";
let topic_name = TopicName::new(TOPIC);
let subscription_name = SubscriptionName::new("test-subscription");
let emulator = Emulator::new().project(project_name).await?;
let client_builder = ClientBuilder::new(
ClientBuilderConfig::new().auth_flow(AuthFlow::NoAuth),
PubSubConfig::new().endpoint(emulator.endpoint()),
)
.await?;
let mut publisher_client = client_builder
.build_publisher(project_name, "test_publisher")
.await?;
publisher_client
.create_topic(TopicConfig {
name: topic_name.clone(),
..TopicConfig::default()
})
.await?;
let mut consumer_client = client_builder
.build_consumer(project_name, "test_queue")
.await?;
consumer_client
.create_subscription(SubscriptionConfig {
name: subscription_name.clone(),
topic: topic_name.clone(),
..SubscriptionConfig::default()
})
.await?;
let mut publisher =
Publisher::<TestMessage>::publish_sink(publisher_client.publisher(), ProstValidator::new());
publisher
.send(TestMessage {
payload: "foobar".into(),
})
.await?;
let consumer = consumer_client
.stream_subscription(subscription_name, StreamSubscriptionConfig::default())
.consume::<TestMessage>(ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)));
pin_mut!(consumer);
assert_eq!(
TestMessage {
payload: "foobar".into()
},
Option::unwrap(consumer.next().await)?.ack().await?
);
Ok(())
}
#[tokio::test]
#[ignore = "pubsub emulator is finicky, run this test manually"]
async fn response_sink_responses() -> Result<(), BoxError> {
let project_name = "test-project";
let topic_name = TopicName::new(TOPIC);
let subscription_name = SubscriptionName::new("test-subscription");
let emulator = Emulator::new().project(project_name).await?;
let client_builder = ClientBuilder::new(
ClientBuilderConfig::new().auth_flow(AuthFlow::NoAuth),
PubSubConfig::new().endpoint(emulator.endpoint()),
)
.await?;
let mut publisher_client = client_builder
.build_publisher(project_name, "test_publisher")
.await?;
publisher_client
.create_topic(TopicConfig {
name: topic_name.clone(),
..TopicConfig::default()
})
.await?;
let mut consumer_client = client_builder
.build_consumer(project_name, "test_queue")
.await?;
consumer_client
.create_subscription(SubscriptionConfig {
name: subscription_name.clone(),
topic: topic_name.clone(),
..SubscriptionConfig::default()
})
.await?;
let (response_sink, mut responses) = futures_channel::mpsc::unbounded();
let mut cx = Context::from_waker(futures_util::task::noop_waker_ref());
let mut publisher = Publisher::<TestMessage, _>::publish_sink_with_responses(
publisher_client.publisher(),
ProstValidator::new(),
response_sink,
);
let consumer = consumer_client
.stream_subscription(subscription_name, StreamSubscriptionConfig::default())
.consume::<TestMessage>(ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)));
pin_mut!(consumer);
{
let message = TestMessage {
payload: "foobar".into(),
};
publisher.feed(message.clone()).await?;
assert_eq!(Poll::Pending, responses.poll_next_unpin(&mut cx));
publisher.flush().await?;
assert_eq!(
Poll::Ready(Some(message.clone())),
responses.poll_next_unpin(&mut cx)
);
assert_eq!(message, Option::unwrap(consumer.next().await)?.ack().await?);
}
{
let message1 = TestMessage {
payload: "one".into(),
};
let message2 = TestMessage {
payload: "two".into(),
};
let message3 = TestMessage {
payload: "three".into(),
};
let invalid_message4 = TestMessage {
payload: "4".repeat(10 * 1_000_000 + 1),
};
let message5 = TestMessage {
payload: "five".into(),
};
publisher.feed(message1.clone()).await?;
publisher.feed(message2.clone()).await?;
publisher.feed(message3.clone()).await?;
publisher.feed(invalid_message4.clone()).await?;
match publisher.poll_ready_unpin(&mut cx) {
Poll::Ready(Err(PublishError::Publish { cause, messages })) => {
assert_eq!(vec![invalid_message4], messages);
assert_eq!(tonic::Code::InvalidArgument, cause.code());
}
other => panic!("expected invalid arg error, was {:?}", other),
}
publisher.feed(message5.clone()).await?;
assert_eq!(Poll::Pending, responses.poll_next_unpin(&mut cx));
publisher.flush().await?;
assert_eq!(
vec![
message1.clone(),
message2.clone(),
message3.clone(),
message5.clone()
],
responses.by_ref().take(4).collect::<Vec<_>>().await
);
assert_eq!(
vec![
message1.clone(),
message2.clone(),
message3.clone(),
message5.clone()
],
consumer
.by_ref()
.take(4)
.map_err(BoxError::from)
.and_then(|msg| msg.ack().map_err(BoxError::from))
.try_collect::<Vec<_>>()
.await?
);
}
{
let message6 = TestMessage {
payload: "six".into(),
};
let message7 = TestMessage {
payload: "seven".into(),
};
let invalid_message8 = TestMessage {
payload: "8".repeat(10 * 1_000_000 - 6),
};
let message9 = TestMessage {
payload: "nine".into(),
};
publisher.feed(message6.clone()).await?;
publisher.feed(message7.clone()).await?;
publisher.feed(invalid_message8.clone()).await?;
assert!(matches!(
publisher.poll_ready_unpin(&mut cx),
Poll::Ready(Ok(()))
));
publisher.start_send_unpin(message9.clone())?;
match futures_util::future::poll_fn(|cx| publisher.poll_ready_unpin(cx)).await {
Err(PublishError::Publish { cause, messages }) => {
assert_eq!(vec![invalid_message8], messages);
assert_eq!(tonic::Code::InvalidArgument, cause.code());
}
other => panic!("expected invalid arg error, was {:?}", other),
}
assert_eq!(
vec![message6.clone(), message7.clone()],
responses.by_ref().take(2).collect::<Vec<_>>().await
);
publisher.flush().await?;
assert_eq!(
vec![message9.clone()],
responses.by_ref().take(1).collect::<Vec<_>>().await
);
assert_eq!(
vec![message6.clone(), message7.clone(), message9.clone()],
consumer
.by_ref()
.take(3)
.map_err(BoxError::from)
.and_then(|msg| msg.ack().map_err(BoxError::from))
.try_collect::<Vec<_>>()
.await?
);
}
Ok(())
}
#[tokio::test]
#[ignore = "pubsub emulator is finicky, run this test manually"]
async fn retry_message_translate() -> Result<(), BoxError> {
let project_name = "roundtrip-test-project";
let topic_name = TopicName::new(TOPIC);
let emulator = Emulator::new().project(project_name).await?;
let client_builder = ClientBuilder::new(
ClientBuilderConfig::new().auth_flow(AuthFlow::NoAuth),
PubSubConfig::new().endpoint(emulator.endpoint()),
)
.await?;
let mut publisher_client = client_builder
.build_publisher(project_name, "roundtrip_test_publisher")
.await?;
publisher_client
.create_topic(TopicConfig {
name: topic_name.clone(),
..TopicConfig::default()
})
.await?;
#[derive(Clone)]
struct TestRetryPolicy {
sender: mpsc::Sender<Vec<TestMessage>>,
}
struct TestRetryOperation {
sender: mpsc::Sender<Vec<TestMessage>>,
}
impl RetryPolicy<[TestMessage], PubSubError> for TestRetryPolicy {
type RetryOp = TestRetryOperation;
fn new_operation(&mut self) -> Self::RetryOp {
TestRetryOperation {
sender: self.sender.clone(),
}
}
}
impl RetryOperation<[TestMessage], PubSubError> for TestRetryOperation {
type Sleep = futures_util::future::Ready<()>;
fn check_retry(
&mut self,
failed_value: &[TestMessage],
_error: &PubSubError,
) -> Option<Self::Sleep> {
self.sender
.send(failed_value.to_owned())
.expect("receiver should not be dropped while senders in use");
None
}
}
let message1 = TestMessage {
payload: "1".repeat(2 * 1_000_000),
};
let message2 = TestMessage {
payload: "2".repeat(2 * 1_000_000),
};
let message3 = TestMessage {
payload: "3".repeat(8 * 1_000_000),
};
let message4 = TestMessage {
payload: "4".into(),
};
let (retry_tx, retry_rx) = mpsc::channel();
let mut publisher = Publisher::<TestMessage>::publish_sink(
publisher_client
.publisher()
.with_retry_policy(TestRetryPolicy { sender: retry_tx }),
ProstValidator::new(),
);
publisher.feed(message1.clone()).await?;
publisher.feed(message2.clone()).await?;
publisher.feed(message3.clone()).await?;
publisher.feed(message4.clone()).await?;
assert_eq!(Err(mpsc::TryRecvError::Empty), retry_rx.try_recv());
std::mem::drop(emulator);
assert_eq!(Err(mpsc::TryRecvError::Empty), retry_rx.try_recv());
match futures_util::future::poll_fn(|cx| publisher.poll_ready_unpin(cx)).await {
Err(PublishError::Publish { cause: _, messages }) => {
assert_eq!(vec![message1.clone(), message2.clone()], messages);
}
other => panic!("expected publish error, was {:?}", other),
}
assert_eq!(Ok(vec![message1, message2]), retry_rx.try_recv());
assert_eq!(Err(mpsc::TryRecvError::Empty), retry_rx.try_recv());
match publisher.flush().await {
Err(PublishError::Publish { cause: _, messages }) => {
assert_eq!(vec![message3.clone(), message4.clone()], messages);
}
other => panic!("expected publish error, was {:?}", other),
}
assert_eq!(Ok(vec![message3, message4]), retry_rx.try_recv());
Ok(())
}