1use std::collections::HashMap;
2use std::time::Duration;
3
4use prost_types::DurationError;
5
6use google_cloud_gax::grpc::{Code, Status};
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::pubsub::v1::{
9 DeleteTopicRequest, GetTopicRequest, IngestionDataSourceSettings, ListTopicSubscriptionsRequest,
10 MessageStoragePolicy, MessageTransform, SchemaSettings, Topic as InternalTopic,
11};
12
13use crate::apiv1::publisher_client::PublisherClient;
14use crate::apiv1::subscriber_client::SubscriberClient;
15use crate::publisher::{Publisher, PublisherConfig};
16use crate::subscription::Subscription;
17
18#[derive(Debug, Clone, Default)]
19pub struct TopicConfig {
20 pub labels: HashMap<String, String>,
21 pub message_storage_policy: Option<MessageStoragePolicy>,
22 pub kms_key_name: String,
23 pub schema_settings: Option<SchemaSettings>,
24 pub satisfies_pzs: bool,
25 pub message_retention_duration: Option<Duration>,
26 pub ingestion_data_source_settings: Option<IngestionDataSourceSettings>,
27 pub message_transform: Vec<MessageTransform>,
28}
29
30#[derive(Clone, Debug)]
34pub struct Topic {
35 fqtn: String,
36 pubc: PublisherClient,
37 subc: SubscriberClient,
38}
39
40impl Topic {
41 pub(crate) fn new(fqtn: String, pubc: PublisherClient, subc: SubscriberClient) -> Self {
42 Self { fqtn, pubc, subc }
43 }
44
45 pub fn id(&self) -> String {
47 self.fqtn
48 .rfind('/')
49 .map_or("".to_string(), |i| self.fqtn[(i + 1)..].to_string())
50 }
51
52 pub fn fully_qualified_name(&self) -> &str {
54 self.fqtn.as_str()
55 }
56
57 pub fn new_publisher(&self, config: Option<PublisherConfig>) -> Publisher {
58 Publisher::new(self.fqtn.clone(), self.pubc.clone(), config)
59 }
60
61 pub async fn create(&self, cfg: Option<TopicConfig>, retry: Option<RetrySetting>) -> Result<(), Status> {
63 let topic_config = cfg.unwrap_or_default();
64 let req = InternalTopic {
65 name: self.fully_qualified_name().to_string(),
66 labels: topic_config.labels,
67 message_storage_policy: topic_config.message_storage_policy,
68 kms_key_name: topic_config.kms_key_name,
69 schema_settings: topic_config.schema_settings,
70 satisfies_pzs: topic_config.satisfies_pzs,
71 message_retention_duration: topic_config
72 .message_retention_duration
73 .map(Duration::try_into)
74 .transpose()
75 .map_err(|err: DurationError| Status::internal(err.to_string()))?,
76 state: 0,
77 ingestion_data_source_settings: topic_config.ingestion_data_source_settings,
78 message_transforms: topic_config.message_transform,
79 };
80 self.pubc.create_topic(req, retry).await.map(|_v| ())
81 }
82
83 pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
85 let req = DeleteTopicRequest {
86 topic: self.fqtn.to_string(),
87 };
88 self.pubc.delete_topic(req, retry).await.map(|v| v.into_inner())
89 }
90
91 pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
93 if self.fqtn == "_deleted-topic_" {
94 return Ok(false);
95 }
96 let req = GetTopicRequest {
97 topic: self.fqtn.to_string(),
98 };
99 match self.pubc.get_topic(req, retry).await {
100 Ok(_) => Ok(true),
101 Err(e) => {
102 if e.code() == Code::NotFound {
103 Ok(false)
104 } else {
105 Err(e)
106 }
107 }
108 }
109 }
110
111 pub async fn subscriptions(&self, retry: Option<RetrySetting>) -> Result<Vec<Subscription>, Status> {
113 let req = ListTopicSubscriptionsRequest {
114 topic: self.fqtn.to_string(),
115 page_size: 0,
116 page_token: "".to_string(),
117 };
118 self.pubc.list_topic_subscriptions(req, retry).await.map(|v| {
119 v.into_iter()
120 .map(|sub_name| Subscription::new(sub_name, self.subc.clone()))
121 .collect()
122 })
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use std::time::Duration;
129
130 use serial_test::serial;
131 use tokio::task::JoinHandle;
132 use tokio::time::sleep;
133 use uuid::Uuid;
134
135 use google_cloud_gax::conn::{ConnectionOptions, Environment};
136 use google_cloud_gax::grpc::{Code, Status};
137 use google_cloud_googleapis::pubsub::v1::PubsubMessage;
138
139 use crate::apiv1::conn_pool::ConnectionManager;
140 use crate::apiv1::publisher_client::PublisherClient;
141 use crate::apiv1::subscriber_client::SubscriberClient;
142 use crate::publisher::{Publisher, PublisherConfig};
143 use crate::topic::Topic;
144
145 async fn create_topic() -> Topic {
146 let environment = Environment::Emulator("localhost:8681".to_string());
147 let cm1 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
148 .await
149 .unwrap();
150 let pubc = PublisherClient::new(cm1);
151 let cm2 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
152 .await
153 .unwrap();
154 let cm3 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
155 .await
156 .unwrap();
157 let subc = SubscriberClient::new(cm2, cm3);
158
159 let uuid = Uuid::new_v4().hyphenated().to_string();
160 let topic_name = format!("projects/local-project/topics/t{uuid}");
161
162 let topic = Topic::new(topic_name, pubc, subc);
164 if !topic.exists(None).await.unwrap() {
165 topic.create(None, None).await.unwrap();
166 }
167 topic
168 }
169
170 async fn publish(publisher: Publisher) -> Vec<JoinHandle<Result<String, Status>>> {
171 (0..10)
172 .map(|_i| {
173 let publisher = publisher.clone();
174 tokio::spawn(async move {
175 let msg = PubsubMessage {
176 data: "abc".into(),
177 ..Default::default()
178 };
179 let awaiter = publisher.publish(msg).await;
180 awaiter.get().await
181 })
182 })
183 .collect()
184 }
185
186 async fn publish_after_shutdown(bulk: bool) {
187 let topic = create_topic().await;
188 let config = PublisherConfig {
189 flush_interval: Duration::from_secs(10),
190 bundle_size: 11,
191 ..Default::default()
192 };
193 let publisher = topic.new_publisher(Some(config));
194
195 let tasks = publish(publisher.clone()).await;
197
198 sleep(Duration::from_secs(1)).await;
200 let mut publisher = publisher;
201 publisher.shutdown().await;
202
203 for task in tasks {
205 let message_id = task.await.unwrap();
206 assert!(message_id.is_ok());
207 assert!(!message_id.unwrap().is_empty());
208 }
209
210 let results = if bulk {
212 let m1 = PubsubMessage::default();
213 let m2 = PubsubMessage {
214 ordering_key: "test".to_string(),
215 ..Default::default()
216 };
217 publisher.publish_bulk(vec![m1, m2]).await
218 } else {
219 vec![publisher.publish(PubsubMessage::default()).await]
220 };
221 for result in results {
222 let err = result.get().await.unwrap_err();
223 assert_eq!(Code::Cancelled, err.code());
224 assert_eq!("closed", err.message());
225 }
226
227 topic.delete(None).await.unwrap();
228 assert!(!topic.exists(None).await.unwrap());
229 }
230
231 #[tokio::test]
232 #[serial]
233 async fn test_publish() {
234 let topic = create_topic().await;
235 let publisher = topic.new_publisher(None);
236
237 let tasks = publish(publisher.clone()).await;
239
240 for task in tasks {
242 let message_id = task.await.unwrap().unwrap();
243 tracing::trace!("{}", message_id);
244 assert!(!message_id.is_empty())
245 }
246
247 let mut publisher = publisher;
249 publisher.shutdown().await;
250
251 let result = publisher.publish(PubsubMessage::default()).await.get().await;
253 assert!(result.is_err());
254
255 topic.delete(None).await.unwrap();
256 }
257
258 #[tokio::test]
259 #[serial]
260 async fn test_publish_after_shutdown() {
261 publish_after_shutdown(false).await
262 }
263
264 #[tokio::test]
265 #[serial]
266 async fn test_publish_bulk_after_shutdown() {
267 publish_after_shutdown(true).await;
268 }
269
270 #[tokio::test]
271 #[serial]
272 async fn test_publish_immediately() {
273 let topic = create_topic().await;
274 let publisher = topic.new_publisher(None);
275
276 let msgs = ["msg1", "msg2"]
278 .map(|v| PubsubMessage {
279 data: v.into(),
280 ..Default::default()
281 })
282 .to_vec();
283 let ack_ids = publisher.publish_immediately(msgs, None).await.unwrap();
284
285 assert_eq!(2, ack_ids.len());
286
287 let mut publisher = publisher;
288 publisher.shutdown().await;
289 topic.delete(None).await.unwrap();
290 }
291}