google-cloud-pubsub
Google Cloud Platform pub/sub library.

Installation
[dependencies]
google-cloud-pubsub = "version"
Quickstart
Authentication
There are two ways to create a client that is authenticated against the google cloud.
Automatically
The function with_auth() will try and read the credentials from a file specified in the environment variable GOOGLE_APPLICATION_CREDENTIALS, GOOGLE_APPLICATION_CREDENTIALS_JSON or
from a metadata server.
This is also described in google-cloud-auth
use google_cloud_pubsub::client::{ClientConfig, Client};
async fn run() {
let config = ClientConfig::default().with_auth().await.unwrap();
let client = Client::new(config).await.unwrap();
}
Manually
When you can't use the gcloud authentication but you have a different way to get your credentials (e.g a different environment variable)
you can parse your own version of the 'credentials-file' and use it like that:
use google_cloud_auth::credentials::CredentialsFile;
use google_cloud_pubsub::client::{ClientConfig, Client};
async fn run(cred: CredentialsFile) {
let config = ClientConfig::default().with_credentials(cred).await.unwrap();
let client = Client::new(config).await.unwrap();
}
Emulator
For tests, you can use the Emulator-Option like that:
Before executing the program, specify the address of the emulator in the following environment variable.
export PUBSUB_EMULATOR_HOST=localhost:8681
Publish Message
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::topic::TopicConfig;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
async fn run(config: ClientConfig) -> Result<(), Status> {
let client = Client::new(config).await.unwrap();
let topic = client.topic("test-topic");
if !topic.exists(None).await? {
topic.create(None, None).await?;
}
let publisher = topic.new_publisher(None);
let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
let publisher = publisher.clone();
tokio::spawn(async move {
let msg = PubsubMessage {
data: "abc".into(),
ordering_key: "order".into(),
..Default::default()
};
let mut awaiter = publisher.publish(msg).await;
awaiter.get().await
})
}).collect();
for task in tasks {
let message_id = task.await.unwrap()?;
}
let mut publisher = publisher;
publisher.shutdown();
Ok(())
}
Subscribe Message
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use futures_util::StreamExt;
async fn run(config: ClientConfig) -> Result<(), Status> {
let client = Client::new(config).await.unwrap();
let topic = client.topic("test-topic");
let config = SubscriptionConfig {
enable_message_ordering: true,
..Default::default()
};
let subscription = client.subscription("test-subscription");
if !subscription.exists(None).await? {
subscription.create(topic.fully_qualified_name(), config, None).await?;
}
let cancel = CancellationToken::new();
let cancel2 = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
cancel2.cancel();
});
subscription.receive(|mut message, cancel| async move {
println!("Got Message: {:?}", message.message.data);
let _ = message.ack().await;
}, cancel.clone(), None).await?;
subscription.delete(None).await?;
Ok(())
}
Subscribe Message (Alternative Way)
After canceling, wait until all pulled messages are processed.
use std::time::Duration;
use futures_util::StreamExt;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
use google_cloud_gax::grpc::Status;
async fn run(config: ClientConfig) -> Result<(), Status> {
let client = Client::new(config).await.unwrap();
let subscription = client.subscription("test-subscription");
let mut stream = subscription.subscribe(None).await.unwrap();
let cancellable = stream.cancellable();
let task = tokio::spawn(async move {
while let Some(message) = stream.next().await {
message.ack().await.unwrap();
}
});
tokio::time::sleep(Duration::from_secs(60)).await;
cancellable.cancel();
let _ = task.await;
Ok(())
}
Unprocessed messages are nack after cancellation.
use std::time::Duration;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
use google_cloud_gax::grpc::Status;
async fn run(config: ClientConfig) -> Result<(), Status> {
let client = Client::new(config).await.unwrap();
let subscription = client.subscription("test-subscription");
let mut stream = subscription.subscribe(None).await.unwrap();
let cancellable = stream.cancellable();
let task = tokio::spawn(async move {
while let Some(message) = stream.read().await {
message.ack().await.unwrap();
}
});
tokio::time::sleep(Duration::from_secs(60)).await;
cancellable.cancel();
let _ = task.await;
Ok(())
}