google_cloud_pubsub/
lib.rs

1//! # google-cloud-pubsub
2//!
3//! Google Cloud Platform pub/sub library.
4//!
5//! * [About Cloud Pub/Sub](https://cloud.google.com/pubsub/)
6//! * [Pub/Sub API Documentation](https://cloud.google.com/pubsub/docs)
7//!
8//! ## Quickstart
9//!
10//! ### Authentication
11//! There are two ways to create a client that is authenticated against the google cloud.
12//!
13//! #### Automatically
14//!
15//! The function `with_auth()` will try and read the credentials from a file specified in the environment variable `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_APPLICATION_CREDENTIALS_JSON` or
16//! from a metadata server.
17//!
18//! This is also described in [google-cloud-auth](https://github.com/yoshidan/google-cloud-rust/blob/main/foundation/auth/README.md)
19//!
20//! ```rust
21//! use google_cloud_pubsub::client::{ClientConfig, Client};
22//!
23//! async fn run() {
24//!     let config = ClientConfig::default().with_auth().await.unwrap();
25//!     let client = Client::new(config).await.unwrap();
26//! }
27//! ```
28//!
29//! ### Manually
30//!
31//! When you cant use the `gcloud` authentication but you have a different way to get your credentials (e.g a different environment variable)
32//! you can parse your own version of the 'credentials-file' and use it like that:
33//!
34//! ```rust
35//! use google_cloud_auth::credentials::CredentialsFile;
36//! // or google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile
37//! use google_cloud_pubsub::client::{ClientConfig, Client};
38//!
39//! async fn run(cred: CredentialsFile) {
40//!     let config = ClientConfig::default().with_credentials(cred).await.unwrap();
41//!     let client = Client::new(config).await.unwrap();
42//! }
43//! ```
44//!
45//! ### Emulator
46//! For tests, you can use the [Emulator-Option](https://github.com/yoshidan/google-cloud-rust/blob/cbd5ed1315d7b828c89a50fe71fcbaf15ddc964b/pubsub/src/client.rs#L32) like that:
47//! Before executing the program, specify the address of the emulator in the following environment variable.
48//!
49//! ```sh
50//! export PUBSUB_EMULATOR_HOST=localhost:8681
51//! ```
52//!
53//! ### Publish Message
54//!
55//! ```
56//! use google_cloud_pubsub::client::{Client, ClientConfig};
57//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
58//! use google_cloud_pubsub::topic::TopicConfig;
59//! use google_cloud_pubsub::subscription::SubscriptionConfig;
60//! use google_cloud_gax::grpc::Status;
61//! use tokio::task::JoinHandle;
62//! use tokio_util::sync::CancellationToken;
63//!
64//! async fn run(config: ClientConfig) -> Result<(), Status> {
65//!
66//!     // Create pubsub client.
67//!     let client = Client::new(config).await.unwrap();
68//!
69//!     // Create topic.
70//!     let topic = client.topic("test-topic");
71//!     if !topic.exists(None).await? {
72//!         topic.create(None, None).await?;
73//!     }
74//!
75//!     // Start publisher.
76//!     let publisher = topic.new_publisher(None);
77//!
78//!     // Publish message.
79//!     let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
80//!         let publisher = publisher.clone();
81//!         tokio::spawn(async move {
82//!             let msg = PubsubMessage {
83//!                data: "abc".into(),
84//!                // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering)
85//!                ordering_key: "order".into(),
86//!                ..Default::default()
87//!             };
88//!
89//!             // Send a message. There are also `publish_bulk` and `publish_immediately` methods.
90//!             let mut awaiter = publisher.publish(msg).await;
91//!
92//!             // The get method blocks until a server-generated ID or an error is returned for the published message.
93//!             awaiter.get().await
94//!         })
95//!     }).collect();
96//!
97//!     // Wait for all publish task finish
98//!     for task in tasks {
99//!         let message_id = task.await.unwrap()?;
100//!     }
101//!
102//!     // Wait for publishers in topic finish.
103//!     let mut publisher = publisher;
104//!     publisher.shutdown();
105//!
106//!     Ok(())
107//! }
108//! ```
109//!
110//! ### Subscribe Message
111//!
112//! ```
113//! use google_cloud_pubsub::client::{Client, ClientConfig};
114//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
115//! use google_cloud_pubsub::subscription::SubscriptionConfig;
116//! use google_cloud_gax::grpc::Status;
117//! use std::time::Duration;
118//! use tokio_util::sync::CancellationToken;
119//! use futures_util::StreamExt;
120//!
121//! async fn run(config: ClientConfig) -> Result<(), Status> {
122//!
123//!     // Create pubsub client.
124//!     let client = Client::new(config).await.unwrap();
125//!
126//!     // Get the topic to subscribe to.
127//!     let topic = client.topic("test-topic");
128//!
129//!     // Create subscription
130//!     // If subscription name does not contain a "/", then the project is taken from client above. Otherwise, the
131//!     // name will be treated as a fully qualified resource name
132//!     let config = SubscriptionConfig {
133//!         // Enable message ordering if needed (https://cloud.google.com/pubsub/docs/ordering)
134//!         enable_message_ordering: true,
135//!         ..Default::default()
136//!     };
137//!
138//!     // Create subscription
139//!     let subscription = client.subscription("test-subscription");
140//!     if !subscription.exists(None).await? {
141//!         subscription.create(topic.fully_qualified_name(), config, None).await?;
142//!     }
143//!
144//!     // Token for cancel.
145//!     let cancel = CancellationToken::new();
146//!     let cancel2 = cancel.clone();
147//!     tokio::spawn(async move {
148//!         // Cancel after 10 seconds.
149//!         tokio::time::sleep(Duration::from_secs(10)).await;
150//!         cancel2.cancel();
151//!     });
152//!
153//!     // Receive blocks until the ctx is cancelled or an error occurs.
154//!     // Or simply use the `subscription.subscribe` method.
155//!     subscription.receive(|mut message, cancel| async move {
156//!         // Handle data.
157//!         println!("Got Message: {:?}", message.message.data);
158//!
159//!         // Ack or Nack message.
160//!         let _ = message.ack().await;
161//!     }, cancel.clone(), None).await?;
162//!
163//!     // Delete subscription if needed.
164//!     subscription.delete(None).await?;
165//!
166//!     Ok(())
167//! }
168//! ```
169//!
170//! ### Subscribe Message (Alternative Way)
171//!
172//! After canceling, wait until all pulled messages are processed.
173//! ```
174//! use std::time::Duration;
175//! use futures_util::StreamExt;
176//! use google_cloud_pubsub::client::{Client, ClientConfig};
177//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
178//! use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
179//! use google_cloud_gax::grpc::Status;
180//!
181//! async fn run(config: ClientConfig) -> Result<(), Status> {
182//!     // Creating Client, Topic and Subscription...
183//!     let client = Client::new(config).await.unwrap();
184//!     let subscription = client.subscription("test-subscription");
185//!
186//!     // Read the messages as a stream
187//!     let mut stream = subscription.subscribe(None).await.unwrap();
188//!     let cancellable = stream.cancellable();
189//!     let task = tokio::spawn(async move {
190//!         // None if the stream is cancelled
191//!         while let Some(message) = stream.next().await {
192//!             message.ack().await.unwrap();
193//!         }
194//!     });
195//!     tokio::time::sleep(Duration::from_secs(60)).await;
196//!     cancellable.cancel();
197//!     let _ = task.await;
198//!     Ok(())
199//! }
200//! ```
201//!
202//! Unprocessed messages are nack after cancellation.
203//! ```
204//! use std::time::Duration;
205//! use google_cloud_pubsub::client::{Client, ClientConfig};
206//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
207//! use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig};
208//! use google_cloud_gax::grpc::Status;
209//!
210//! async fn run(config: ClientConfig) -> Result<(), Status> {
211//!     // Creating Client, Topic and Subscription...
212//!     let client = Client::new(config).await.unwrap();
213//!     let subscription = client.subscription("test-subscription");
214//!
215//!     // Read the messages as a stream
216//!     let mut stream = subscription.subscribe(None).await.unwrap();
217//!     let cancellable = stream.cancellable();
218//!     let task = tokio::spawn(async move {
219//!         // None if the tream is cancelled
220//!         while let Some(message) = stream.read().await {
221//!             message.ack().await.unwrap();
222//!         }
223//!     });
224//!     tokio::time::sleep(Duration::from_secs(60)).await;
225//!     cancellable.cancel();
226//!     let _ = task.await;
227//!     Ok(())
228//! }
229//! ```
230pub mod apiv1;
231pub mod client;
232pub mod publisher;
233pub mod subscriber;
234pub mod subscription;
235pub mod topic;
236pub mod util;