google-cloud-pubsub
Google Cloud Platform pub/sub library.
Installation
[dependencies]
google-cloud-pubsub = <version>
Quick Start
Publish Message
use google_cloud_pubsub::client::Client;
use google_cloud_gax::cancel::CancellationToken;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::topic::TopicConfig;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() -> Result<(), Status> {
let mut client = Client::new("local-project", None).await.unwrap();
let topic = client.topic("test-topic");
if !topic.exists(None, None).await? {
topic.create(None, None, None).await?;
}
let publisher = topic.new_publisher(None);
let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
let publisher = publisher.clone();
tokio::spawn(async move {
let mut msg = PubsubMessage::default();
msg.data = "abc".into();
let mut awaiter = publisher.publish(msg).await;
awaiter.get(None).await
})
}).collect();
for task in tasks {
let message_id = task.await.unwrap()?;
}
let mut publisher = publisher;
publisher.shutdown();
Ok(())
}
Subscribe Message
use google_cloud_pubsub::client::Client;
use google_cloud_gax::cancel::CancellationToken;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Status> {
let mut client = Client::new("local-project", None).await.unwrap();
let topic = client.topic("test-topic");
let mut config = SubscriptionConfig::default();
config.enable_message_ordering = true;
let subscription = client.subscription("test-subscription");
if !subscription.exists(None, None).await? {
subscription.create(topic.fully_qualified_name(), config, None, None).await?;
}
let cancel = CancellationToken::new();
let cancel2 = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
cancel2.cancel();
});
subscription.receive(|mut message, cancel| async move {
let data = message.message.data.as_slice();
println!("{:?}", data);
message.ack().await;
}, cancel.clone(), None).await;
subscription.delete(None, None).await;
Ok(())
}
Example
Here is the example with using Warp.