Skip to main content

gcloud_pubsub/
topic.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use prost_types::DurationError;
5
6use google_cloud_gax::grpc::{Code, Status};
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::pubsub::v1::{
9    DeleteTopicRequest, GetTopicRequest, IngestionDataSourceSettings, ListTopicSubscriptionsRequest,
10    MessageStoragePolicy, MessageTransform, SchemaSettings, Topic as InternalTopic,
11};
12
13use crate::apiv1::publisher_client::PublisherClient;
14use crate::apiv1::subscriber_client::SubscriberClient;
15use crate::publisher::{Publisher, PublisherConfig};
16use crate::subscription::Subscription;
17
18#[derive(Debug, Clone, Default)]
19pub struct TopicConfig {
20    pub labels: HashMap<String, String>,
21    pub message_storage_policy: Option<MessageStoragePolicy>,
22    pub kms_key_name: String,
23    pub schema_settings: Option<SchemaSettings>,
24    pub satisfies_pzs: bool,
25    pub message_retention_duration: Option<Duration>,
26    pub ingestion_data_source_settings: Option<IngestionDataSourceSettings>,
27    pub message_transform: Vec<MessageTransform>,
28}
29
30/// Topic is a reference to a PubSub topic.
31///
32/// The methods of Topic are safe for use by multiple tasks.
33#[derive(Clone, Debug)]
34pub struct Topic {
35    fqtn: String,
36    pubc: PublisherClient,
37    subc: SubscriberClient,
38}
39
40impl Topic {
41    pub(crate) fn new(fqtn: String, pubc: PublisherClient, subc: SubscriberClient) -> Self {
42        Self { fqtn, pubc, subc }
43    }
44
45    /// id returns the unique identifier of the topic within its project.
46    pub fn id(&self) -> String {
47        self.fqtn
48            .rfind('/')
49            .map_or("".to_string(), |i| self.fqtn[(i + 1)..].to_string())
50    }
51
52    /// fully_qualified_name returns the printable globally unique name for the topic.
53    pub fn fully_qualified_name(&self) -> &str {
54        self.fqtn.as_str()
55    }
56
57    pub fn new_publisher(&self, config: Option<PublisherConfig>) -> Publisher {
58        Publisher::new(self.fqtn.clone(), self.pubc.clone(), config)
59    }
60
61    /// create creates the topic.
62    pub async fn create(&self, cfg: Option<TopicConfig>, retry: Option<RetrySetting>) -> Result<(), Status> {
63        let topic_config = cfg.unwrap_or_default();
64        let req = InternalTopic {
65            name: self.fully_qualified_name().to_string(),
66            labels: topic_config.labels,
67            message_storage_policy: topic_config.message_storage_policy,
68            kms_key_name: topic_config.kms_key_name,
69            schema_settings: topic_config.schema_settings,
70            satisfies_pzs: topic_config.satisfies_pzs,
71            message_retention_duration: topic_config
72                .message_retention_duration
73                .map(Duration::try_into)
74                .transpose()
75                .map_err(|err: DurationError| Status::internal(err.to_string()))?,
76            state: 0,
77            ingestion_data_source_settings: topic_config.ingestion_data_source_settings,
78            message_transforms: topic_config.message_transform,
79        };
80        self.pubc.create_topic(req, retry).await.map(|_v| ())
81    }
82
83    /// delete deletes the topic.
84    pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
85        let req = DeleteTopicRequest {
86            topic: self.fqtn.to_string(),
87        };
88        self.pubc.delete_topic(req, retry).await.map(|v| v.into_inner())
89    }
90
91    /// exists reports whether the topic exists on the server.
92    pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
93        if self.fqtn == "_deleted-topic_" {
94            return Ok(false);
95        }
96        let req = GetTopicRequest {
97            topic: self.fqtn.to_string(),
98        };
99        match self.pubc.get_topic(req, retry).await {
100            Ok(_) => Ok(true),
101            Err(e) => {
102                if e.code() == Code::NotFound {
103                    Ok(false)
104                } else {
105                    Err(e)
106                }
107            }
108        }
109    }
110
111    /// Subscriptions returns an iterator which returns the subscriptions for this topic.
112    pub async fn subscriptions(&self, retry: Option<RetrySetting>) -> Result<Vec<Subscription>, Status> {
113        let req = ListTopicSubscriptionsRequest {
114            topic: self.fqtn.to_string(),
115            page_size: 0,
116            page_token: "".to_string(),
117        };
118        self.pubc.list_topic_subscriptions(req, retry).await.map(|v| {
119            v.into_iter()
120                .map(|sub_name| Subscription::new(sub_name, self.subc.clone()))
121                .collect()
122        })
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use std::time::Duration;
129
130    use serial_test::serial;
131    use tokio::task::JoinHandle;
132    use tokio::time::sleep;
133    use uuid::Uuid;
134
135    use google_cloud_gax::conn::{ConnectionOptions, Environment};
136    use google_cloud_gax::grpc::{Code, Status};
137    use google_cloud_googleapis::pubsub::v1::PubsubMessage;
138
139    use crate::apiv1::conn_pool::ConnectionManager;
140    use crate::apiv1::publisher_client::PublisherClient;
141    use crate::apiv1::subscriber_client::SubscriberClient;
142    use crate::publisher::{Publisher, PublisherConfig};
143    use crate::topic::Topic;
144
145    async fn create_topic() -> Topic {
146        let environment = Environment::Emulator("localhost:8681".to_string());
147        let cm1 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
148            .await
149            .unwrap();
150        let pubc = PublisherClient::new(cm1);
151        let cm2 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
152            .await
153            .unwrap();
154        let cm3 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
155            .await
156            .unwrap();
157        let subc = SubscriberClient::new(cm2, cm3);
158
159        let uuid = Uuid::new_v4().hyphenated().to_string();
160        let topic_name = format!("projects/local-project/topics/t{uuid}");
161
162        // Create topic.
163        let topic = Topic::new(topic_name, pubc, subc);
164        if !topic.exists(None).await.unwrap() {
165            topic.create(None, None).await.unwrap();
166        }
167        topic
168    }
169
170    async fn publish(publisher: Publisher) -> Vec<JoinHandle<Result<String, Status>>> {
171        (0..10)
172            .map(|_i| {
173                let publisher = publisher.clone();
174                tokio::spawn(async move {
175                    let msg = PubsubMessage {
176                        data: "abc".into(),
177                        ..Default::default()
178                    };
179                    let awaiter = publisher.publish(msg).await;
180                    awaiter.get().await
181                })
182            })
183            .collect()
184    }
185
186    async fn publish_after_shutdown(bulk: bool) {
187        let topic = create_topic().await;
188        let config = PublisherConfig {
189            flush_interval: Duration::from_secs(10),
190            bundle_size: 11,
191            ..Default::default()
192        };
193        let publisher = topic.new_publisher(Some(config));
194
195        // Publish message.
196        let tasks = publish(publisher.clone()).await;
197
198        // Shutdown after 1 sec
199        sleep(Duration::from_secs(1)).await;
200        let mut publisher = publisher;
201        publisher.shutdown().await;
202
203        // Confirm flush bundle.
204        for task in tasks {
205            let message_id = task.await.unwrap();
206            assert!(message_id.is_ok());
207            assert!(!message_id.unwrap().is_empty());
208        }
209
210        // Can't publish messages
211        let results = if bulk {
212            let m1 = PubsubMessage::default();
213            let m2 = PubsubMessage {
214                ordering_key: "test".to_string(),
215                ..Default::default()
216            };
217            publisher.publish_bulk(vec![m1, m2]).await
218        } else {
219            vec![publisher.publish(PubsubMessage::default()).await]
220        };
221        for result in results {
222            let err = result.get().await.unwrap_err();
223            assert_eq!(Code::Cancelled, err.code());
224            assert_eq!("closed", err.message());
225        }
226
227        topic.delete(None).await.unwrap();
228        assert!(!topic.exists(None).await.unwrap());
229    }
230
231    #[tokio::test]
232    #[serial]
233    async fn test_publish() {
234        let topic = create_topic().await;
235        let publisher = topic.new_publisher(None);
236
237        // Publish message.
238        let tasks = publish(publisher.clone()).await;
239
240        // Wait for all publish task finish
241        for task in tasks {
242            let message_id = task.await.unwrap().unwrap();
243            tracing::trace!("{}", message_id);
244            assert!(!message_id.is_empty())
245        }
246
247        // Wait for publishers in topic finish.
248        let mut publisher = publisher;
249        publisher.shutdown().await;
250
251        // Can't publish messages
252        let result = publisher.publish(PubsubMessage::default()).await.get().await;
253        assert!(result.is_err());
254
255        topic.delete(None).await.unwrap();
256    }
257
258    #[tokio::test]
259    #[serial]
260    async fn test_publish_after_shutdown() {
261        publish_after_shutdown(false).await
262    }
263
264    #[tokio::test]
265    #[serial]
266    async fn test_publish_bulk_after_shutdown() {
267        publish_after_shutdown(true).await;
268    }
269
270    #[tokio::test]
271    #[serial]
272    async fn test_publish_immediately() {
273        let topic = create_topic().await;
274        let publisher = topic.new_publisher(None);
275
276        // Publish message.
277        let msgs = ["msg1", "msg2"]
278            .map(|v| PubsubMessage {
279                data: v.into(),
280                ..Default::default()
281            })
282            .to_vec();
283        let ack_ids = publisher.publish_immediately(msgs, None).await.unwrap();
284
285        assert_eq!(2, ack_ids.len());
286
287        let mut publisher = publisher;
288        publisher.shutdown().await;
289        topic.delete(None).await.unwrap();
290    }
291}