1use std::collections::HashMap;
2use std::time::Duration;
3
4use prost_types::DurationError;
5
6use google_cloud_gax::grpc::{Code, Status};
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::pubsub::v1::{
9 DeleteTopicRequest, GetTopicRequest, IngestionDataSourceSettings, ListTopicSubscriptionsRequest,
10 MessageStoragePolicy, SchemaSettings, Topic as InternalTopic,
11};
12
13use crate::apiv1::publisher_client::PublisherClient;
14use crate::apiv1::subscriber_client::SubscriberClient;
15use crate::publisher::{Publisher, PublisherConfig};
16use crate::subscription::Subscription;
17
18#[derive(Debug, Clone)]
19pub struct TopicConfig {
20 pub labels: HashMap<String, String>,
21 pub message_storage_policy: Option<MessageStoragePolicy>,
22 pub kms_key_name: String,
23 pub schema_settings: Option<SchemaSettings>,
24 pub satisfies_pzs: bool,
25 pub message_retention_duration: Option<Duration>,
26 pub ingestion_data_source_settings: Option<IngestionDataSourceSettings>,
27}
28
29impl Default for TopicConfig {
30 fn default() -> Self {
31 Self {
32 labels: HashMap::default(),
33 message_storage_policy: None,
34 kms_key_name: "".to_string(),
35 schema_settings: None,
36 satisfies_pzs: false,
37 message_retention_duration: None,
38 ingestion_data_source_settings: None,
39 }
40 }
41}
42
43#[derive(Clone, Debug)]
47pub struct Topic {
48 fqtn: String,
49 pubc: PublisherClient,
50 subc: SubscriberClient,
51}
52
53impl Topic {
54 pub(crate) fn new(fqtn: String, pubc: PublisherClient, subc: SubscriberClient) -> Self {
55 Self { fqtn, pubc, subc }
56 }
57
58 pub fn id(&self) -> String {
60 self.fqtn
61 .rfind('/')
62 .map_or("".to_string(), |i| self.fqtn[(i + 1)..].to_string())
63 }
64
65 pub fn fully_qualified_name(&self) -> &str {
67 self.fqtn.as_str()
68 }
69
70 pub fn new_publisher(&self, config: Option<PublisherConfig>) -> Publisher {
71 Publisher::new(self.fqtn.clone(), self.pubc.clone(), config)
72 }
73
74 pub async fn create(&self, cfg: Option<TopicConfig>, retry: Option<RetrySetting>) -> Result<(), Status> {
76 let topic_config = cfg.unwrap_or_default();
77 let req = InternalTopic {
78 name: self.fully_qualified_name().to_string(),
79 labels: topic_config.labels,
80 message_storage_policy: topic_config.message_storage_policy,
81 kms_key_name: topic_config.kms_key_name,
82 schema_settings: topic_config.schema_settings,
83 satisfies_pzs: topic_config.satisfies_pzs,
84 message_retention_duration: topic_config
85 .message_retention_duration
86 .map(Duration::try_into)
87 .transpose()
88 .map_err(|err: DurationError| Status::internal(err.to_string()))?,
89 state: 0,
90 ingestion_data_source_settings: topic_config.ingestion_data_source_settings,
91 };
92 self.pubc.create_topic(req, retry).await.map(|_v| ())
93 }
94
95 pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
97 let req = DeleteTopicRequest {
98 topic: self.fqtn.to_string(),
99 };
100 self.pubc.delete_topic(req, retry).await.map(|v| v.into_inner())
101 }
102
103 pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
105 if self.fqtn == "_deleted-topic_" {
106 return Ok(false);
107 }
108 let req = GetTopicRequest {
109 topic: self.fqtn.to_string(),
110 };
111 match self.pubc.get_topic(req, retry).await {
112 Ok(_) => Ok(true),
113 Err(e) => {
114 if e.code() == Code::NotFound {
115 Ok(false)
116 } else {
117 Err(e)
118 }
119 }
120 }
121 }
122
123 pub async fn subscriptions(&self, retry: Option<RetrySetting>) -> Result<Vec<Subscription>, Status> {
125 let req = ListTopicSubscriptionsRequest {
126 topic: self.fqtn.to_string(),
127 page_size: 0,
128 page_token: "".to_string(),
129 };
130 self.pubc.list_topic_subscriptions(req, retry).await.map(|v| {
131 v.into_iter()
132 .map(|sub_name| Subscription::new(sub_name, self.subc.clone()))
133 .collect()
134 })
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use std::time::Duration;
141
142 use serial_test::serial;
143 use tokio::task::JoinHandle;
144 use tokio::time::sleep;
145 use uuid::Uuid;
146
147 use google_cloud_gax::conn::{ConnectionOptions, Environment};
148 use google_cloud_gax::grpc::{Code, Status};
149 use google_cloud_googleapis::pubsub::v1::PubsubMessage;
150
151 use crate::apiv1::conn_pool::ConnectionManager;
152 use crate::apiv1::publisher_client::PublisherClient;
153 use crate::apiv1::subscriber_client::SubscriberClient;
154 use crate::publisher::{Publisher, PublisherConfig};
155 use crate::topic::Topic;
156
157 #[ctor::ctor]
158 fn init() {
159 let _ = tracing_subscriber::fmt().try_init();
160 }
161
162 async fn create_topic() -> Topic {
163 let environment = Environment::Emulator("localhost:8681".to_string());
164 let cm1 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
165 .await
166 .unwrap();
167 let pubc = PublisherClient::new(cm1);
168 let cm2 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
169 .await
170 .unwrap();
171 let cm3 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
172 .await
173 .unwrap();
174 let subc = SubscriberClient::new(cm2, cm3);
175
176 let uuid = Uuid::new_v4().hyphenated().to_string();
177 let topic_name = format!("projects/local-project/topics/t{uuid}");
178
179 let topic = Topic::new(topic_name, pubc, subc);
181 if !topic.exists(None).await.unwrap() {
182 topic.create(None, None).await.unwrap();
183 }
184 topic
185 }
186
187 async fn publish(publisher: Publisher) -> Vec<JoinHandle<Result<String, Status>>> {
188 (0..10)
189 .map(|_i| {
190 let publisher = publisher.clone();
191 tokio::spawn(async move {
192 let msg = PubsubMessage {
193 data: "abc".into(),
194 ..Default::default()
195 };
196 let awaiter = publisher.publish(msg).await;
197 awaiter.get().await
198 })
199 })
200 .collect()
201 }
202
203 async fn publish_after_shutdown(bulk: bool) {
204 let topic = create_topic().await;
205 let config = PublisherConfig {
206 flush_interval: Duration::from_secs(10),
207 bundle_size: 11,
208 ..Default::default()
209 };
210 let publisher = topic.new_publisher(Some(config));
211
212 let tasks = publish(publisher.clone()).await;
214
215 sleep(Duration::from_secs(1)).await;
217 let mut publisher = publisher;
218 publisher.shutdown().await;
219
220 for task in tasks {
222 let message_id = task.await.unwrap();
223 assert!(message_id.is_ok());
224 assert!(!message_id.unwrap().is_empty());
225 }
226
227 let results = if bulk {
229 let m1 = PubsubMessage::default();
230 let m2 = PubsubMessage {
231 ordering_key: "test".to_string(),
232 ..Default::default()
233 };
234 publisher.publish_bulk(vec![m1, m2]).await
235 } else {
236 vec![publisher.publish(PubsubMessage::default()).await]
237 };
238 for result in results {
239 let err = result.get().await.unwrap_err();
240 assert_eq!(Code::Cancelled, err.code());
241 assert_eq!("closed", err.message());
242 }
243
244 topic.delete(None).await.unwrap();
245 assert!(!topic.exists(None).await.unwrap());
246 }
247
248 #[tokio::test]
249 #[serial]
250 async fn test_publish() {
251 let topic = create_topic().await;
252 let publisher = topic.new_publisher(None);
253
254 let tasks = publish(publisher.clone()).await;
256
257 for task in tasks {
259 let message_id = task.await.unwrap().unwrap();
260 tracing::trace!("{}", message_id);
261 assert!(!message_id.is_empty())
262 }
263
264 let mut publisher = publisher;
266 publisher.shutdown().await;
267
268 let result = publisher.publish(PubsubMessage::default()).await.get().await;
270 assert!(result.is_err());
271
272 topic.delete(None).await.unwrap();
273 }
274
275 #[tokio::test]
276 #[serial]
277 async fn test_publish_after_shutdown() {
278 publish_after_shutdown(false).await
279 }
280
281 #[tokio::test]
282 #[serial]
283 async fn test_publish_bulk_after_shutdown() {
284 publish_after_shutdown(true).await;
285 }
286
287 #[tokio::test]
288 #[serial]
289 async fn test_publish_immediately() {
290 let topic = create_topic().await;
291 let publisher = topic.new_publisher(None);
292
293 let msgs = ["msg1", "msg2"]
295 .map(|v| PubsubMessage {
296 data: v.into(),
297 ..Default::default()
298 })
299 .to_vec();
300 let ack_ids = publisher.publish_immediately(msgs, None).await.unwrap();
301
302 assert_eq!(2, ack_ids.len());
303
304 let mut publisher = publisher;
305 publisher.shutdown().await;
306 topic.delete(None).await.unwrap();
307 }
308}