1use std::collections::HashMap;
2use std::time::Duration;
3
4use prost_types::DurationError;
5
6use google_cloud_gax::grpc::{Code, Status};
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::pubsub::v1::{
9 DeleteTopicRequest, GetTopicRequest, IngestionDataSourceSettings, ListTopicSubscriptionsRequest,
10 MessageStoragePolicy, MessageTransform, SchemaSettings, Topic as InternalTopic,
11};
12
13use crate::apiv1::publisher_client::PublisherClient;
14use crate::apiv1::subscriber_client::SubscriberClient;
15use crate::publisher::{Publisher, PublisherConfig};
16use crate::subscription::Subscription;
17
18#[derive(Debug, Clone, Default)]
19pub struct TopicConfig {
20 pub labels: HashMap<String, String>,
21 pub message_storage_policy: Option<MessageStoragePolicy>,
22 pub kms_key_name: String,
23 pub schema_settings: Option<SchemaSettings>,
24 pub satisfies_pzs: bool,
25 pub message_retention_duration: Option<Duration>,
26 pub ingestion_data_source_settings: Option<IngestionDataSourceSettings>,
27 pub message_transform: Vec<MessageTransform>,
28}
29
30#[derive(Clone, Debug)]
34pub struct Topic {
35 fqtn: String,
36 pubc: PublisherClient,
37 subc: SubscriberClient,
38}
39
40impl Topic {
41 pub(crate) fn new(fqtn: String, pubc: PublisherClient, subc: SubscriberClient) -> Self {
42 Self { fqtn, pubc, subc }
43 }
44
45 pub fn id(&self) -> String {
47 self.fqtn
48 .rfind('/')
49 .map_or("".to_string(), |i| self.fqtn[(i + 1)..].to_string())
50 }
51
52 pub fn fully_qualified_name(&self) -> &str {
54 self.fqtn.as_str()
55 }
56
57 pub fn new_publisher(&self, config: Option<PublisherConfig>) -> Publisher {
58 Publisher::new(self.fqtn.clone(), self.pubc.clone(), config)
59 }
60
61 pub async fn create(&self, cfg: Option<TopicConfig>, retry: Option<RetrySetting>) -> Result<(), Status> {
63 let topic_config = cfg.unwrap_or_default();
64 let req = InternalTopic {
65 name: self.fully_qualified_name().to_string(),
66 labels: topic_config.labels,
67 message_storage_policy: topic_config.message_storage_policy,
68 kms_key_name: topic_config.kms_key_name,
69 schema_settings: topic_config.schema_settings,
70 satisfies_pzs: topic_config.satisfies_pzs,
71 message_retention_duration: topic_config
72 .message_retention_duration
73 .map(Duration::try_into)
74 .transpose()
75 .map_err(|err: DurationError| Status::internal(err.to_string()))?,
76 state: 0,
77 ingestion_data_source_settings: topic_config.ingestion_data_source_settings,
78 message_transforms: topic_config.message_transform,
79 };
80 self.pubc.create_topic(req, retry).await.map(|_v| ())
81 }
82
83 pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
85 let req = DeleteTopicRequest {
86 topic: self.fqtn.to_string(),
87 };
88 self.pubc.delete_topic(req, retry).await.map(|v| v.into_inner())
89 }
90
91 pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
93 if self.fqtn == "_deleted-topic_" {
94 return Ok(false);
95 }
96 let req = GetTopicRequest {
97 topic: self.fqtn.to_string(),
98 };
99 match self.pubc.get_topic(req, retry).await {
100 Ok(_) => Ok(true),
101 Err(e) => {
102 if e.code() == Code::NotFound {
103 Ok(false)
104 } else {
105 Err(e)
106 }
107 }
108 }
109 }
110
111 pub async fn subscriptions(&self, retry: Option<RetrySetting>) -> Result<Vec<Subscription>, Status> {
113 let req = ListTopicSubscriptionsRequest {
114 topic: self.fqtn.to_string(),
115 page_size: 0,
116 page_token: "".to_string(),
117 };
118 self.pubc.list_topic_subscriptions(req, retry).await.map(|v| {
119 v.into_iter()
120 .map(|sub_name| Subscription::new(sub_name, self.subc.clone()))
121 .collect()
122 })
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use std::time::Duration;
129
130 use serial_test::serial;
131 use tokio::task::JoinHandle;
132 use tokio::time::sleep;
133 use uuid::Uuid;
134
135 use google_cloud_gax::conn::{ConnectionOptions, Environment};
136 use google_cloud_gax::grpc::{Code, Status};
137 use google_cloud_googleapis::pubsub::v1::PubsubMessage;
138
139 use crate::apiv1::conn_pool::ConnectionManager;
140 use crate::apiv1::publisher_client::PublisherClient;
141 use crate::apiv1::subscriber_client::SubscriberClient;
142 use crate::publisher::{Publisher, PublisherConfig};
143 use crate::topic::Topic;
144
145 #[ctor::ctor]
146 fn init() {
147 let _ = tracing_subscriber::fmt().try_init();
148 }
149
150 async fn create_topic() -> Topic {
151 let environment = Environment::Emulator("localhost:8681".to_string());
152 let cm1 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
153 .await
154 .unwrap();
155 let pubc = PublisherClient::new(cm1);
156 let cm2 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
157 .await
158 .unwrap();
159 let cm3 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
160 .await
161 .unwrap();
162 let subc = SubscriberClient::new(cm2, cm3);
163
164 let uuid = Uuid::new_v4().hyphenated().to_string();
165 let topic_name = format!("projects/local-project/topics/t{uuid}");
166
167 let topic = Topic::new(topic_name, pubc, subc);
169 if !topic.exists(None).await.unwrap() {
170 topic.create(None, None).await.unwrap();
171 }
172 topic
173 }
174
175 async fn publish(publisher: Publisher) -> Vec<JoinHandle<Result<String, Status>>> {
176 (0..10)
177 .map(|_i| {
178 let publisher = publisher.clone();
179 tokio::spawn(async move {
180 let msg = PubsubMessage {
181 data: "abc".into(),
182 ..Default::default()
183 };
184 let awaiter = publisher.publish(msg).await;
185 awaiter.get().await
186 })
187 })
188 .collect()
189 }
190
191 async fn publish_after_shutdown(bulk: bool) {
192 let topic = create_topic().await;
193 let config = PublisherConfig {
194 flush_interval: Duration::from_secs(10),
195 bundle_size: 11,
196 ..Default::default()
197 };
198 let publisher = topic.new_publisher(Some(config));
199
200 let tasks = publish(publisher.clone()).await;
202
203 sleep(Duration::from_secs(1)).await;
205 let mut publisher = publisher;
206 publisher.shutdown().await;
207
208 for task in tasks {
210 let message_id = task.await.unwrap();
211 assert!(message_id.is_ok());
212 assert!(!message_id.unwrap().is_empty());
213 }
214
215 let results = if bulk {
217 let m1 = PubsubMessage::default();
218 let m2 = PubsubMessage {
219 ordering_key: "test".to_string(),
220 ..Default::default()
221 };
222 publisher.publish_bulk(vec![m1, m2]).await
223 } else {
224 vec![publisher.publish(PubsubMessage::default()).await]
225 };
226 for result in results {
227 let err = result.get().await.unwrap_err();
228 assert_eq!(Code::Cancelled, err.code());
229 assert_eq!("closed", err.message());
230 }
231
232 topic.delete(None).await.unwrap();
233 assert!(!topic.exists(None).await.unwrap());
234 }
235
236 #[tokio::test]
237 #[serial]
238 async fn test_publish() {
239 let topic = create_topic().await;
240 let publisher = topic.new_publisher(None);
241
242 let tasks = publish(publisher.clone()).await;
244
245 for task in tasks {
247 let message_id = task.await.unwrap().unwrap();
248 tracing::trace!("{}", message_id);
249 assert!(!message_id.is_empty())
250 }
251
252 let mut publisher = publisher;
254 publisher.shutdown().await;
255
256 let result = publisher.publish(PubsubMessage::default()).await.get().await;
258 assert!(result.is_err());
259
260 topic.delete(None).await.unwrap();
261 }
262
263 #[tokio::test]
264 #[serial]
265 async fn test_publish_after_shutdown() {
266 publish_after_shutdown(false).await
267 }
268
269 #[tokio::test]
270 #[serial]
271 async fn test_publish_bulk_after_shutdown() {
272 publish_after_shutdown(true).await;
273 }
274
275 #[tokio::test]
276 #[serial]
277 async fn test_publish_immediately() {
278 let topic = create_topic().await;
279 let publisher = topic.new_publisher(None);
280
281 let msgs = ["msg1", "msg2"]
283 .map(|v| PubsubMessage {
284 data: v.into(),
285 ..Default::default()
286 })
287 .to_vec();
288 let ack_ids = publisher.publish_immediately(msgs, None).await.unwrap();
289
290 assert_eq!(2, ack_ids.len());
291
292 let mut publisher = publisher;
293 publisher.shutdown().await;
294 topic.delete(None).await.unwrap();
295 }
296}