Skip to main content

gcloud_pubsub/
lib.rs

1//! # google-cloud-pubsub
2//!
3//! Google Cloud Platform pub/sub library.
4//!
5//! * [About Cloud Pub/Sub](https://cloud.google.com/pubsub/)
6//! * [Pub/Sub API Documentation](https://cloud.google.com/pubsub/docs)
7//!
8//! ## Quickstart
9//!
10//! ### Authentication
11//! There are two ways to create a client that is authenticated against the google cloud.
12//!
13//! #### Automatically
14//!
15//! The function `with_auth()` will try and read the credentials from a file specified in the environment variable `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_APPLICATION_CREDENTIALS_JSON` or
16//! from a metadata server.
17//!
18//! This is also described in [google-cloud-auth](https://github.com/yoshidan/google-cloud-rust/blob/main/foundation/auth/README.md)
19//!
20//! ```rust
21//! use google_cloud_pubsub::client::{ClientConfig, Client};
22//!
23//! async fn run() {
24//!     let config = ClientConfig::default().with_auth().await.unwrap();
25//!     let client = Client::new(config).await.unwrap();
26//! }
27//! ```
28//!
29//! ### Manually
30//!
31//! When you cant use the `gcloud` authentication but you have a different way to get your credentials (e.g a different environment variable)
32//! you can parse your own version of the 'credentials-file' and use it like that:
33//!
34//! ```rust
35//! use google_cloud_auth::credentials::CredentialsFile;
36//! // or google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile
37//! use google_cloud_pubsub::client::{ClientConfig, Client};
38//!
39//! async fn run(cred: CredentialsFile) {
40//!     let config = ClientConfig::default().with_credentials(cred).await.unwrap();
41//!     let client = Client::new(config).await.unwrap();
42//! }
43//! ```
44//!
45//! ### Emulator
46//! For tests, you can use the [Emulator-Option](https://github.com/yoshidan/google-cloud-rust/blob/cbd5ed1315d7b828c89a50fe71fcbaf15ddc964b/pubsub/src/client.rs#L32) like that:
47//! Before executing the program, specify the address of the emulator in the following environment variable.
48//!
49//! ```sh
50//! export PUBSUB_EMULATOR_HOST=localhost:8681
51//! ```
52//!
53//! ### Publish Message
54//!
55//! ```
56//! use google_cloud_pubsub::client::{Client, ClientConfig};
57//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
58//! use google_cloud_pubsub::topic::TopicConfig;
59//! use google_cloud_pubsub::subscription::SubscriptionConfig;
60//! use google_cloud_gax::grpc::Status;
61//! use tokio::task::JoinHandle;
62//! use tokio_util::sync::CancellationToken;
63//!
64//! async fn run(config: ClientConfig) -> Result<(), Status> {
65//!
66//!     // Create pubsub client.
67//!     let client = Client::new(config).await.unwrap();
68//!
69//!     // Create topic.
70//!     let topic = client.topic("test-topic");
71//!     if !topic.exists(None).await? {
72//!         topic.create(None, None).await?;
73//!     }
74//!
75//!     // Start publisher.
76//!     let publisher = topic.new_publisher(None);
77//!
78//!     // Publish message.
79//!     let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
80//!         let publisher = publisher.clone();
81//!         tokio::spawn(async move {
82//!             let msg = PubsubMessage {
83//!                data: "abc".into(),
84//!                // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering)
85//!                ordering_key: "order".into(),
86//!                ..Default::default()
87//!             };
88//!
89//!             // Send a message. There are also `publish_bulk` and `publish_immediately` methods.
90//!             let mut awaiter = publisher.publish(msg).await;
91//!
92//!             // The get method blocks until a server-generated ID or an error is returned for the published message.
93//!             awaiter.get().await
94//!         })
95//!     }).collect();
96//!
97//!     // Wait for all publish task finish
98//!     for task in tasks {
99//!         let message_id = task.await.unwrap()?;
100//!     }
101//!
102//!     // Wait for publishers in topic finish.
103//!     let mut publisher = publisher;
104//!     publisher.shutdown();
105//!
106//!     Ok(())
107//! }
108//! ```
109//!
110//! ### Subscribe Message
111//!
112//! ```
113//! use google_cloud_pubsub::client::{Client, ClientConfig};
114//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
115//! use google_cloud_pubsub::subscription::SubscriptionConfig;
116//! use google_cloud_gax::grpc::Status;
117//! use std::time::Duration;
118//! use tokio_util::sync::CancellationToken;
119//! use futures_util::StreamExt;
120//!
121//! async fn run(config: ClientConfig) -> Result<(), Status> {
122//!
123//!     // Create pubsub client.
124//!     let client = Client::new(config).await.unwrap();
125//!
126//!     // Get the topic to subscribe to.
127//!     let topic = client.topic("test-topic");
128//!
129//!     // Create subscription
130//!     // If subscription name does not contain a "/", then the project is taken from client above. Otherwise, the
131//!     // name will be treated as a fully qualified resource name
132//!     let config = SubscriptionConfig {
133//!         // Enable message ordering if needed (https://cloud.google.com/pubsub/docs/ordering)
134//!         enable_message_ordering: true,
135//!         ..Default::default()
136//!     };
137//!
138//!     // Create subscription
139//!     let subscription = client.subscription("test-subscription");
140//!     if !subscription.exists(None).await? {
141//!         subscription.create(topic.fully_qualified_name(), config, None).await?;
142//!     }
143//!
144//!     // Token for cancel.
145//!     let cancel = CancellationToken::new();
146//!     let cancel_for_task = cancel.clone();
147//!     tokio::spawn(async move {
148//!         // Cancel after 10 seconds.
149//!         tokio::time::sleep(Duration::from_secs(10)).await;
150//!         cancel_for_task.cancel();
151//!     });
152//!
153//!     // Start receiving messages from the subscription.
154//!     let mut iter = subscription.subscribe(None).await?;
155//!     // Get buffered messages.
156//!     // To close safely, use a CancellationToken or to signal shutdown.
157//!     while let Some(message) = tokio::select!{
158//!         v = iter.next() => v,
159//!         _ = cancel.cancelled() => None,
160//!     } {
161//!         let _ = message.ack().await;
162//!     }
163//!     // Wait for all the unprocessed messages to be Nack.
164//!     // If you don't call dispose, the unprocessed messages will be Nack when the iterator is dropped.
165//!     iter.dispose().await;
166//!
167//!     // Delete subscription if needed.
168//!     subscription.delete(None).await?;
169//!
170//!     Ok(())
171//! }
172//! ```
173pub mod apiv1;
174pub mod client;
175pub mod publisher;
176pub mod subscriber;
177pub mod subscription;
178pub mod topic;
179pub mod util;
180
181#[cfg(test)]
182mod test {
183    use tracing_subscriber::EnvFilter;
184
185    #[ctor::ctor]
186    fn init() {
187        let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("gcloud_pubsub=trace"));
188        let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
189    }
190}