use std::io::{self, Write};
use serde::{Deserialize, Serialize};
use crate::pubsub;
macro_rules! assert_ok {
($expr:expr) => {
match $expr {
Ok(value) => value,
Err(err) => {
panic!("asserted result is an error: {}", err);
}
}
};
}
macro_rules! assert_some {
($expr:expr) => {
match $expr {
Some(value) => value,
None => {
panic!("asserted option is an none");
}
}
};
}
async fn setup_client() -> Result<pubsub::Client, pubsub::Error> {
let creds = super::load_creds();
pubsub::Client::from_credentials(env!("GCP_TEST_PROJECT"), creds).await
}
#[tokio::test]
async fn pubsub_lists_topics() {
let mut client = assert_ok!(setup_client().await);
assert_ok!(client.topics().await);
}
#[tokio::test]
async fn pubsub_sends_and_receives_message_successfully() {
let mut client = assert_ok!(setup_client().await);
print!("acquiring topic... ");
io::stdout().flush().unwrap();
let config = pubsub::TopicConfig::default();
let topic = match client.create_topic(env!("GCP_TEST_TOPIC"), config).await {
Ok(topic) => Ok(Some(topic)),
Err(_) => client.topic(env!("GCP_TEST_TOPIC")).await,
};
let mut topic = assert_some!(assert_ok!(topic));
println!("OK !");
print!("acquiring subscription... ");
io::stdout().flush().unwrap();
let config = pubsub::SubscriptionConfig::default();
let subscription = match topic
.create_subscription(env!("GCP_TEST_SUBSCRIPTION"), config)
.await
{
Ok(subscription) => Ok(Some(subscription)),
Err(_) => client.subscription(env!("GCP_TEST_SUBSCRIPTION")).await,
};
let mut subscription = assert_some!(assert_ok!(subscription));
println!("OK !");
print!("serializing message... ");
io::stdout().flush().unwrap();
#[derive(Serialize, Deserialize)]
struct Message<'a> {
name: &'a str,
value: &'a str,
}
let data = Message {
name: "hello",
value: "world !",
};
let message = assert_ok!(json::to_vec(&data));
println!("OK !");
print!("sending message... ");
io::stdout().flush().unwrap();
assert_ok!(topic.publish(message).await);
println!("OK !");
print!("receiving message... ");
io::stdout().flush().unwrap();
let mut received = assert_some!(subscription.receive().await);
println!("OK !");
print!("acknowledging message... ");
io::stdout().flush().unwrap();
assert_ok!(received.ack().await);
println!("OK !");
print!("deserializing message... ");
io::stdout().flush().unwrap();
assert_ok!(json::from_slice::<Message>(received.data()));
println!("OK !");
let received = subscription
.receive_with_options(pubsub::ReceiveOptions {
return_immediately: true,
max_messages: 1,
})
.await;
assert_eq!(received.is_none(), true);
println!("OK !");
print!("deleting subscription... ");
io::stdout().flush().unwrap();
assert_ok!(subscription.delete().await);
println!("OK !");
print!("deleting topic... ");
io::stdout().flush().unwrap();
assert_ok!(topic.delete().await);
println!("OK !");
}