gcloud_pubsub/lib.rs
1//! # google-cloud-pubsub
2//!
3//! Google Cloud Platform pub/sub library.
4//!
5//! * [About Cloud Pub/Sub](https://cloud.google.com/pubsub/)
6//! * [Pub/Sub API Documentation](https://cloud.google.com/pubsub/docs)
7//!
8//! ## Quickstart
9//!
10//! ### Authentication
11//! There are two ways to create a client that is authenticated against the google cloud.
12//!
13//! #### Automatically
14//!
15//! The function `with_auth()` will try and read the credentials from a file specified in the environment variable `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_APPLICATION_CREDENTIALS_JSON` or
16//! from a metadata server.
17//!
18//! This is also described in [google-cloud-auth](https://github.com/yoshidan/google-cloud-rust/blob/main/foundation/auth/README.md)
19//!
20//! ```rust
21//! use google_cloud_pubsub::client::{ClientConfig, Client};
22//!
23//! async fn run() {
24//! let config = ClientConfig::default().with_auth().await.unwrap();
25//! let client = Client::new(config).await.unwrap();
26//! }
27//! ```
28//!
29//! ### Manually
30//!
31//! When you cant use the `gcloud` authentication but you have a different way to get your credentials (e.g a different environment variable)
32//! you can parse your own version of the 'credentials-file' and use it like that:
33//!
34//! ```rust
35//! use google_cloud_auth::credentials::CredentialsFile;
36//! // or google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile
37//! use google_cloud_pubsub::client::{ClientConfig, Client};
38//!
39//! async fn run(cred: CredentialsFile) {
40//! let config = ClientConfig::default().with_credentials(cred).await.unwrap();
41//! let client = Client::new(config).await.unwrap();
42//! }
43//! ```
44//!
45//! ### Emulator
46//! For tests, you can use the [Emulator-Option](https://github.com/yoshidan/google-cloud-rust/blob/cbd5ed1315d7b828c89a50fe71fcbaf15ddc964b/pubsub/src/client.rs#L32) like that:
47//! Before executing the program, specify the address of the emulator in the following environment variable.
48//!
49//! ```sh
50//! export PUBSUB_EMULATOR_HOST=localhost:8681
51//! ```
52//!
53//! ### Publish Message
54//!
55//! ```
56//! use google_cloud_pubsub::client::{Client, ClientConfig};
57//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
58//! use google_cloud_pubsub::topic::TopicConfig;
59//! use google_cloud_pubsub::subscription::SubscriptionConfig;
60//! use google_cloud_gax::grpc::Status;
61//! use tokio::task::JoinHandle;
62//! use tokio_util::sync::CancellationToken;
63//!
64//! async fn run(config: ClientConfig) -> Result<(), Status> {
65//!
66//! // Create pubsub client.
67//! let client = Client::new(config).await.unwrap();
68//!
69//! // Create topic.
70//! let topic = client.topic("test-topic");
71//! if !topic.exists(None).await? {
72//! topic.create(None, None).await?;
73//! }
74//!
75//! // Start publisher.
76//! let publisher = topic.new_publisher(None);
77//!
78//! // Publish message.
79//! let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
80//! let publisher = publisher.clone();
81//! tokio::spawn(async move {
82//! let msg = PubsubMessage {
83//! data: "abc".into(),
84//! // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering)
85//! ordering_key: "order".into(),
86//! ..Default::default()
87//! };
88//!
89//! // Send a message. There are also `publish_bulk` and `publish_immediately` methods.
90//! let mut awaiter = publisher.publish(msg).await;
91//!
92//! // The get method blocks until a server-generated ID or an error is returned for the published message.
93//! awaiter.get().await
94//! })
95//! }).collect();
96//!
97//! // Wait for all publish task finish
98//! for task in tasks {
99//! let message_id = task.await.unwrap()?;
100//! }
101//!
102//! // Wait for publishers in topic finish.
103//! let mut publisher = publisher;
104//! publisher.shutdown();
105//!
106//! Ok(())
107//! }
108//! ```
109//!
110//! ### Subscribe Message
111//!
112//! ```
113//! use google_cloud_pubsub::client::{Client, ClientConfig};
114//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
115//! use google_cloud_pubsub::subscription::SubscriptionConfig;
116//! use google_cloud_gax::grpc::Status;
117//! use std::time::Duration;
118//! use tokio_util::sync::CancellationToken;
119//! use futures_util::StreamExt;
120//!
121//! async fn run(config: ClientConfig) -> Result<(), Status> {
122//!
123//! // Create pubsub client.
124//! let client = Client::new(config).await.unwrap();
125//!
126//! // Get the topic to subscribe to.
127//! let topic = client.topic("test-topic");
128//!
129//! // Create subscription
130//! // If subscription name does not contain a "/", then the project is taken from client above. Otherwise, the
131//! // name will be treated as a fully qualified resource name
132//! let config = SubscriptionConfig {
133//! // Enable message ordering if needed (https://cloud.google.com/pubsub/docs/ordering)
134//! enable_message_ordering: true,
135//! ..Default::default()
136//! };
137//!
138//! // Create subscription
139//! let subscription = client.subscription("test-subscription");
140//! if !subscription.exists(None).await? {
141//! subscription.create(topic.fully_qualified_name(), config, None).await?;
142//! }
143//!
144//! // Token for cancel.
145//! let cancel = CancellationToken::new();
146//! let cancel_for_task = cancel.clone();
147//! tokio::spawn(async move {
148//! // Cancel after 10 seconds.
149//! tokio::time::sleep(Duration::from_secs(10)).await;
150//! cancel_for_task.cancel();
151//! });
152//!
153//! // Start receiving messages from the subscription.
154//! let mut iter = subscription.subscribe(None).await?;
155//! // Get buffered messages.
156//! // To close safely, use a CancellationToken or to signal shutdown.
157//! while let Some(message) = tokio::select!{
158//! v = iter.next() => v,
159//! _ = cancel.cancelled() => None,
160//! } {
161//! let _ = message.ack().await;
162//! }
163//! // Wait for all the unprocessed messages to be Nack.
164//! // If you don't call dispose, the unprocessed messages will be Nack when the iterator is dropped.
165//! iter.dispose().await;
166//!
167//! // Delete subscription if needed.
168//! subscription.delete(None).await?;
169//!
170//! Ok(())
171//! }
172//! ```
173pub mod apiv1;
174pub mod client;
175pub mod publisher;
176pub mod subscriber;
177pub mod subscription;
178pub mod topic;
179pub mod util;
180
181#[cfg(test)]
182mod test {
183 use tracing_subscriber::EnvFilter;
184
185 #[ctor::ctor]
186 fn init() {
187 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("gcloud_pubsub=trace"));
188 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
189 }
190}