google-cloud-pubsub
Google Cloud Platform pub/sub library.
Installation
[]
= "version"
Quickstart
Authentication
There are two ways to create a client that is authenticated against the google cloud.
Automatically
The function with_auth() will try and read the credentials from a file specified in the environment variable GOOGLE_APPLICATION_CREDENTIALS, GOOGLE_APPLICATION_CREDENTIALS_JSON or
from a metadata server.
This is also described in google-cloud-auth
use ;
async
Manually
When you can't use the gcloud authentication but you have a different way to get your credentials (e.g a different environment variable)
you can parse your own version of the 'credentials-file' and use it like that:
use CredentialsFile;
// or google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile
use ;
async
Emulator
For tests, you can use the Emulator-Option like that: Before executing the program, specify the address of the emulator in the following environment variable.
Publish Message
use ;
use PubsubMessage;
use TopicConfig;
use SubscriptionConfig;
use Status;
use JoinHandle;
use CancellationToken;
async
Subscribe Message
use ;
use PubsubMessage;
use SubscriptionConfig;
use Status;
use Duration;
use CancellationToken;
use StreamExt;
async
Subscribe Message (Alternative Way)
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use futures_util::StreamExt;
async fn run(config: ClientConfig) -> Result<(), Status> {
// Creating Client, Topic and Subscription...
let client = Client::new(config).await.unwrap();
let subscription = client.subscription("test-subscription");
// Read the messages as a stream
// (needs futures_util::StreamExt as import)
// Note: This blocks the current thread but helps working with non clonable data
let mut stream = subscription.subscribe(None).await?;
while let Some(message) = stream.next().await {
// Handle data.
println!("Got Message: {:?}", message.message);
// Ack or Nack message.
let _ = message.ack().await;
}
Ok(())
}