1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//! # google-cloud-pubsub
//!
//! Google Cloud Platform pub/sub library.
//!
//! * [About Cloud Pub/Sub](https://cloud.google.com/pubsub/)
//! * [Pub/Sub API Documentation](https://cloud.google.com/pubsub/docs)
//!
//! ## Quick Start
//!
//! ### Publish Message
//!
//! ```
//! use google_cloud_pubsub::client::Client;
//! use google_cloud_gax::cancel::CancellationToken;
//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
//! use google_cloud_pubsub::topic::TopicConfig;
//! use google_cloud_pubsub::subscription::SubscriptionConfig;
//! use google_cloud_gax::grpc::Status;
//! use tokio::task::JoinHandle;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Status> {
//!
//!     // Create pubsub client.
//!     // The default project is determined by credentials.
//!     // - If the GOOGLE_APPLICATION_CREDENTIALS is specified the project_id is from credentials.
//!     // - If the server is running on GCP the project_id is from metadata server
//!     // - If the PUBSUB_EMULATOR_HOST is specified the project_id is 'local-project'
//!     let mut client = Client::default().await.unwrap();
//!
//!     // Create topic.
//!     let topic = client.topic("test-topic");
//!     if !topic.exists(None, None).await? {
//!         topic.create(None, None, None).await?;
//!     }
//!
//!     // Start publisher.
//!     let publisher = topic.new_publisher(None);
//!
//!     // Publish message.
//!     let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
//!         let publisher = publisher.clone();
//!         tokio::spawn(async move {
//!             let mut msg = PubsubMessage::default();
//!             msg.data = "abc".into();
//!             // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering)
//!             // msg.ordering_key = "order".into();
//!
//!             let mut awaiter = publisher.publish(msg).await;
//!             // The get method blocks until a server-generated ID or an error is returned for the published message.
//!             awaiter.get(None).await
//!         })
//!     }).collect();
//!
//!     // Wait for all publish task finish
//!     for task in tasks {
//!         let message_id = task.await.unwrap()?;
//!     }
//!
//!     // Wait for publishers in topic finish.
//!     let mut publisher = publisher;
//!     publisher.shutdown();
//!
//!     Ok(())
//! }
//! ```
//!
//! ### Subscribe Message
//!
//! ```
//! use google_cloud_pubsub::client::Client;
//! use google_cloud_gax::cancel::CancellationToken;
//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
//! use google_cloud_pubsub::subscription::SubscriptionConfig;
//! use google_cloud_gax::grpc::Status;
//! use std::time::Duration;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Status> {
//!
//!     // Create pubsub client.
//!     // The default project is determined by credentials.
//!     // - If the GOOGLE_APPLICATION_CREDENTIALS is specified the project_id is from credentials.
//!     // - If the server is running on GCP the project_id is from metadata server
//!     // - If the PUBSUB_EMULATOR_HOST is specified the project_id is 'local-project'
//!     let mut client = Client::default().await.unwrap();
//!
//!     // Get the topic to subscribe to.
//!     let topic = client.topic("test-topic");
//!
//!     // Configure subscription.
//!     let mut config = SubscriptionConfig::default();
//!     // Enable message ordering if needed (https://cloud.google.com/pubsub/docs/ordering)
//!     config.enable_message_ordering = true;
//!
//!     // Create subscription
//!     let subscription = client.subscription("test-subscription");
//!     if !subscription.exists(None, None).await? {
//!         subscription.create(topic.fully_qualified_name(), config, None, None).await?;
//!     }
//!     // Token for cancel.
//!     let cancel = CancellationToken::new();
//!     let cancel2 = cancel.clone();
//!     tokio::spawn(async move {
//!         // Cancel after 10 seconds.
//!         tokio::time::sleep(Duration::from_secs(10)).await;
//!         cancel2.cancel();
//!     });
//!
//!     // Receive blocks until the ctx is cancelled or an error occurs.
//!     subscription.receive(|mut message, cancel| async move {
//!         // Handle data.
//!         let data = message.message.data.as_slice();
//!         println!("{:?}", data);
//!
//!         // Ack or Nack message.
//!         message.ack().await;
//!     }, cancel.clone(), None).await;
//!
//!     // Delete subscription if needed.
//!     subscription.delete(None, None).await;
//!
//!     Ok(())
//! }
//! ```
pub mod apiv1;
pub mod client;
pub mod publisher;
pub mod subscriber;
pub mod subscription;
pub mod topic;
pub mod util;