google_cloud_pubsub/lib.rs
1//! # google-cloud-pubsub
2//!
3//! Google Cloud Platform pub/sub library.
4//!
5//! * [About Cloud Pub/Sub](https://cloud.google.com/pubsub/)
6//! * [Pub/Sub API Documentation](https://cloud.google.com/pubsub/docs)
7//!
8//! ## Quickstart
9//!
10//! ### Authentication
11//! There are two ways to create a client that is authenticated against the google cloud.
12//!
13//! #### Automatically
14//!
15//! The function `with_auth()` will try and read the credentials from a file specified in the environment variable `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_APPLICATION_CREDENTIALS_JSON` or
16//! from a metadata server.
17//!
18//! This is also described in [google-cloud-auth](https://github.com/yoshidan/google-cloud-rust/blob/main/foundation/auth/README.md)
19//!
20//! ```rust
21//! use google_cloud_pubsub::client::{ClientConfig, Client};
22//!
23//! async fn run() {
24//! let config = ClientConfig::default().with_auth().await.unwrap();
25//! let client = Client::new(config).await.unwrap();
26//! }
27//! ```
28//!
29//! ### Manually
30//!
31//! When you cant use the `gcloud` authentication but you have a different way to get your credentials (e.g a different environment variable)
32//! you can parse your own version of the 'credentials-file' and use it like that:
33//!
34//! ```rust
35//! use google_cloud_auth::credentials::CredentialsFile;
36//! // or google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile
37//! use google_cloud_pubsub::client::{ClientConfig, Client};
38//!
39//! async fn run(cred: CredentialsFile) {
40//! let config = ClientConfig::default().with_credentials(cred).await.unwrap();
41//! let client = Client::new(config).await.unwrap();
42//! }
43//! ```
44//!
45//! ### Emulator
46//! For tests, you can use the [Emulator-Option](https://github.com/yoshidan/google-cloud-rust/blob/cbd5ed1315d7b828c89a50fe71fcbaf15ddc964b/pubsub/src/client.rs#L32) like that:
47//! Before executing the program, specify the address of the emulator in the following environment variable.
48//!
49//! ```sh
50//! export PUBSUB_EMULATOR_HOST=localhost:8681
51//! ```
52//!
53//! ### Publish Message
54//!
55//! ```
56//! use google_cloud_pubsub::client::{Client, ClientConfig};
57//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
58//! use google_cloud_pubsub::topic::TopicConfig;
59//! use google_cloud_pubsub::subscription::SubscriptionConfig;
60//! use google_cloud_gax::grpc::Status;
61//! use tokio::task::JoinHandle;
62//! use tokio_util::sync::CancellationToken;
63//!
64//! async fn run(config: ClientConfig) -> Result<(), Status> {
65//!
66//! // Create pubsub client.
67//! let client = Client::new(config).await.unwrap();
68//!
69//! // Create topic.
70//! let topic = client.topic("test-topic");
71//! if !topic.exists(None).await? {
72//! topic.create(None, None).await?;
73//! }
74//!
75//! // Start publisher.
76//! let publisher = topic.new_publisher(None);
77//!
78//! // Publish message.
79//! let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
80//! let publisher = publisher.clone();
81//! tokio::spawn(async move {
82//! let msg = PubsubMessage {
83//! data: "abc".into(),
84//! // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering)
85//! ordering_key: "order".into(),
86//! ..Default::default()
87//! };
88//!
89//! // Send a message. There are also `publish_bulk` and `publish_immediately` methods.
90//! let mut awaiter = publisher.publish(msg).await;
91//!
92//! // The get method blocks until a server-generated ID or an error is returned for the published message.
93//! awaiter.get().await
94//! })
95//! }).collect();
96//!
97//! // Wait for all publish task finish
98//! for task in tasks {
99//! let message_id = task.await.unwrap()?;
100//! }
101//!
102//! // Wait for publishers in topic finish.
103//! let mut publisher = publisher;
104//! publisher.shutdown();
105//!
106//! Ok(())
107//! }
108//! ```
109//!
110//! ### Subscribe Message
111//!
112//! ```
113//! use google_cloud_pubsub::client::{Client, ClientConfig};
114//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
115//! use google_cloud_pubsub::subscription::SubscriptionConfig;
116//! use google_cloud_gax::grpc::Status;
117//! use std::time::Duration;
118//! use tokio_util::sync::CancellationToken;
119//! use futures_util::StreamExt;
120//!
121//! async fn run(config: ClientConfig) -> Result<(), Status> {
122//!
123//! // Create pubsub client.
124//! let client = Client::new(config).await.unwrap();
125//!
126//! // Get the topic to subscribe to.
127//! let topic = client.topic("test-topic");
128//!
129//! // Create subscription
130//! // If subscription name does not contain a "/", then the project is taken from client above. Otherwise, the
131//! // name will be treated as a fully qualified resource name
132//! let config = SubscriptionConfig {
133//! // Enable message ordering if needed (https://cloud.google.com/pubsub/docs/ordering)
134//! enable_message_ordering: true,
135//! ..Default::default()
136//! };
137//!
138//! // Create subscription
139//! let subscription = client.subscription("test-subscription");
140//! if !subscription.exists(None).await? {
141//! subscription.create(topic.fully_qualified_name(), config, None).await?;
142//! }
143//!
144//! // Token for cancel.
145//! let cancel = CancellationToken::new();
146//! let cancel2 = cancel.clone();
147//! tokio::spawn(async move {
148//! // Cancel after 10 seconds.
149//! tokio::time::sleep(Duration::from_secs(10)).await;
150//! cancel2.cancel();
151//! });
152//!
153//! // Receive blocks until the ctx is cancelled or an error occurs.
154//! // Or simply use the `subscription.subscribe` method.
155//! subscription.receive(|mut message, cancel| async move {
156//! // Handle data.
157//! println!("Got Message: {:?}", message.message.data);
158//!
159//! // Ack or Nack message.
160//! let _ = message.ack().await;
161//! }, cancel.clone(), None).await?;
162//!
163//! // Delete subscription if needed.
164//! subscription.delete(None).await?;
165//!
166//! Ok(())
167//! }
168//! ```
169//!
170//! ### Subscribe Message (Alternative Way)
171//!
172//! After canceling, wait until all pulled messages are processed.
173//! ```
174//! use std::time::Duration;
175//! use futures_util::StreamExt;
176//! use google_cloud_pubsub::client::{Client, ClientConfig};
177//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
178//! use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
179//! use google_cloud_gax::grpc::Status;
180//!
181//! async fn run(config: ClientConfig) -> Result<(), Status> {
182//! // Creating Client, Topic and Subscription...
183//! let client = Client::new(config).await.unwrap();
184//! let subscription = client.subscription("test-subscription");
185//!
186//! // Read the messages as a stream
187//! let mut stream = subscription.subscribe(None).await.unwrap();
188//! let cancellable = stream.cancellable();
189//! let task = tokio::spawn(async move {
190//! // None if the stream is cancelled
191//! while let Some(message) = stream.next().await {
192//! message.ack().await.unwrap();
193//! }
194//! });
195//! tokio::time::sleep(Duration::from_secs(60)).await;
196//! cancellable.cancel();
197//! let _ = task.await;
198//! Ok(())
199//! }
200//! ```
201//!
202//! Unprocessed messages are nack after cancellation.
203//! ```
204//! use std::time::Duration;
205//! use google_cloud_pubsub::client::{Client, ClientConfig};
206//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
207//! use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
208//! use google_cloud_gax::grpc::Status;
209//!
210//! async fn run(config: ClientConfig) -> Result<(), Status> {
211//! // Creating Client, Topic and Subscription...
212//! let client = Client::new(config).await.unwrap();
213//! let subscription = client.subscription("test-subscription");
214//!
215//! // Read the messages as a stream
216//! let mut stream = subscription.subscribe(None).await.unwrap();
217//! let cancellable = stream.cancellable();
218//! let task = tokio::spawn(async move {
219//! // None if the tream is cancelled
220//! while let Some(message) = stream.read().await {
221//! message.ack().await.unwrap();
222//! }
223//! });
224//! tokio::time::sleep(Duration::from_secs(60)).await;
225//! cancellable.cancel();
226//! let _ = task.await;
227//! Ok(())
228//! }
229//! ```
230pub mod apiv1;
231pub mod client;
232pub mod publisher;
233pub mod subscriber;
234pub mod subscription;
235pub mod topic;
236pub mod util;