gcloud_pubsub/
topic.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use prost_types::DurationError;
5
6use google_cloud_gax::grpc::{Code, Status};
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::pubsub::v1::{
9    DeleteTopicRequest, GetTopicRequest, IngestionDataSourceSettings, ListTopicSubscriptionsRequest,
10    MessageStoragePolicy, MessageTransform, SchemaSettings, Topic as InternalTopic,
11};
12
13use crate::apiv1::publisher_client::PublisherClient;
14use crate::apiv1::subscriber_client::SubscriberClient;
15use crate::publisher::{Publisher, PublisherConfig};
16use crate::subscription::Subscription;
17
18#[derive(Debug, Clone, Default)]
19pub struct TopicConfig {
20    pub labels: HashMap<String, String>,
21    pub message_storage_policy: Option<MessageStoragePolicy>,
22    pub kms_key_name: String,
23    pub schema_settings: Option<SchemaSettings>,
24    pub satisfies_pzs: bool,
25    pub message_retention_duration: Option<Duration>,
26    pub ingestion_data_source_settings: Option<IngestionDataSourceSettings>,
27    pub message_transform: Vec<MessageTransform>,
28}
29
30/// Topic is a reference to a PubSub topic.
31///
32/// The methods of Topic are safe for use by multiple tasks.
33#[derive(Clone, Debug)]
34pub struct Topic {
35    fqtn: String,
36    pubc: PublisherClient,
37    subc: SubscriberClient,
38}
39
40impl Topic {
41    pub(crate) fn new(fqtn: String, pubc: PublisherClient, subc: SubscriberClient) -> Self {
42        Self { fqtn, pubc, subc }
43    }
44
45    /// id returns the unique identifier of the topic within its project.
46    pub fn id(&self) -> String {
47        self.fqtn
48            .rfind('/')
49            .map_or("".to_string(), |i| self.fqtn[(i + 1)..].to_string())
50    }
51
52    /// fully_qualified_name returns the printable globally unique name for the topic.
53    pub fn fully_qualified_name(&self) -> &str {
54        self.fqtn.as_str()
55    }
56
57    pub fn new_publisher(&self, config: Option<PublisherConfig>) -> Publisher {
58        Publisher::new(self.fqtn.clone(), self.pubc.clone(), config)
59    }
60
61    /// create creates the topic.
62    pub async fn create(&self, cfg: Option<TopicConfig>, retry: Option<RetrySetting>) -> Result<(), Status> {
63        let topic_config = cfg.unwrap_or_default();
64        let req = InternalTopic {
65            name: self.fully_qualified_name().to_string(),
66            labels: topic_config.labels,
67            message_storage_policy: topic_config.message_storage_policy,
68            kms_key_name: topic_config.kms_key_name,
69            schema_settings: topic_config.schema_settings,
70            satisfies_pzs: topic_config.satisfies_pzs,
71            message_retention_duration: topic_config
72                .message_retention_duration
73                .map(Duration::try_into)
74                .transpose()
75                .map_err(|err: DurationError| Status::internal(err.to_string()))?,
76            state: 0,
77            ingestion_data_source_settings: topic_config.ingestion_data_source_settings,
78            message_transforms: topic_config.message_transform,
79        };
80        self.pubc.create_topic(req, retry).await.map(|_v| ())
81    }
82
83    /// delete deletes the topic.
84    pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
85        let req = DeleteTopicRequest {
86            topic: self.fqtn.to_string(),
87        };
88        self.pubc.delete_topic(req, retry).await.map(|v| v.into_inner())
89    }
90
91    /// exists reports whether the topic exists on the server.
92    pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
93        if self.fqtn == "_deleted-topic_" {
94            return Ok(false);
95        }
96        let req = GetTopicRequest {
97            topic: self.fqtn.to_string(),
98        };
99        match self.pubc.get_topic(req, retry).await {
100            Ok(_) => Ok(true),
101            Err(e) => {
102                if e.code() == Code::NotFound {
103                    Ok(false)
104                } else {
105                    Err(e)
106                }
107            }
108        }
109    }
110
111    /// Subscriptions returns an iterator which returns the subscriptions for this topic.
112    pub async fn subscriptions(&self, retry: Option<RetrySetting>) -> Result<Vec<Subscription>, Status> {
113        let req = ListTopicSubscriptionsRequest {
114            topic: self.fqtn.to_string(),
115            page_size: 0,
116            page_token: "".to_string(),
117        };
118        self.pubc.list_topic_subscriptions(req, retry).await.map(|v| {
119            v.into_iter()
120                .map(|sub_name| Subscription::new(sub_name, self.subc.clone()))
121                .collect()
122        })
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use std::time::Duration;
129
130    use serial_test::serial;
131    use tokio::task::JoinHandle;
132    use tokio::time::sleep;
133    use uuid::Uuid;
134
135    use google_cloud_gax::conn::{ConnectionOptions, Environment};
136    use google_cloud_gax::grpc::{Code, Status};
137    use google_cloud_googleapis::pubsub::v1::PubsubMessage;
138
139    use crate::apiv1::conn_pool::ConnectionManager;
140    use crate::apiv1::publisher_client::PublisherClient;
141    use crate::apiv1::subscriber_client::SubscriberClient;
142    use crate::publisher::{Publisher, PublisherConfig};
143    use crate::topic::Topic;
144
145    #[ctor::ctor]
146    fn init() {
147        let _ = tracing_subscriber::fmt().try_init();
148    }
149
150    async fn create_topic() -> Topic {
151        let environment = Environment::Emulator("localhost:8681".to_string());
152        let cm1 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
153            .await
154            .unwrap();
155        let pubc = PublisherClient::new(cm1);
156        let cm2 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
157            .await
158            .unwrap();
159        let cm3 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
160            .await
161            .unwrap();
162        let subc = SubscriberClient::new(cm2, cm3);
163
164        let uuid = Uuid::new_v4().hyphenated().to_string();
165        let topic_name = format!("projects/local-project/topics/t{uuid}");
166
167        // Create topic.
168        let topic = Topic::new(topic_name, pubc, subc);
169        if !topic.exists(None).await.unwrap() {
170            topic.create(None, None).await.unwrap();
171        }
172        topic
173    }
174
175    async fn publish(publisher: Publisher) -> Vec<JoinHandle<Result<String, Status>>> {
176        (0..10)
177            .map(|_i| {
178                let publisher = publisher.clone();
179                tokio::spawn(async move {
180                    let msg = PubsubMessage {
181                        data: "abc".into(),
182                        ..Default::default()
183                    };
184                    let awaiter = publisher.publish(msg).await;
185                    awaiter.get().await
186                })
187            })
188            .collect()
189    }
190
191    async fn publish_after_shutdown(bulk: bool) {
192        let topic = create_topic().await;
193        let config = PublisherConfig {
194            flush_interval: Duration::from_secs(10),
195            bundle_size: 11,
196            ..Default::default()
197        };
198        let publisher = topic.new_publisher(Some(config));
199
200        // Publish message.
201        let tasks = publish(publisher.clone()).await;
202
203        // Shutdown after 1 sec
204        sleep(Duration::from_secs(1)).await;
205        let mut publisher = publisher;
206        publisher.shutdown().await;
207
208        // Confirm flush bundle.
209        for task in tasks {
210            let message_id = task.await.unwrap();
211            assert!(message_id.is_ok());
212            assert!(!message_id.unwrap().is_empty());
213        }
214
215        // Can't publish messages
216        let results = if bulk {
217            let m1 = PubsubMessage::default();
218            let m2 = PubsubMessage {
219                ordering_key: "test".to_string(),
220                ..Default::default()
221            };
222            publisher.publish_bulk(vec![m1, m2]).await
223        } else {
224            vec![publisher.publish(PubsubMessage::default()).await]
225        };
226        for result in results {
227            let err = result.get().await.unwrap_err();
228            assert_eq!(Code::Cancelled, err.code());
229            assert_eq!("closed", err.message());
230        }
231
232        topic.delete(None).await.unwrap();
233        assert!(!topic.exists(None).await.unwrap());
234    }
235
236    #[tokio::test]
237    #[serial]
238    async fn test_publish() {
239        let topic = create_topic().await;
240        let publisher = topic.new_publisher(None);
241
242        // Publish message.
243        let tasks = publish(publisher.clone()).await;
244
245        // Wait for all publish task finish
246        for task in tasks {
247            let message_id = task.await.unwrap().unwrap();
248            tracing::trace!("{}", message_id);
249            assert!(!message_id.is_empty())
250        }
251
252        // Wait for publishers in topic finish.
253        let mut publisher = publisher;
254        publisher.shutdown().await;
255
256        // Can't publish messages
257        let result = publisher.publish(PubsubMessage::default()).await.get().await;
258        assert!(result.is_err());
259
260        topic.delete(None).await.unwrap();
261    }
262
263    #[tokio::test]
264    #[serial]
265    async fn test_publish_after_shutdown() {
266        publish_after_shutdown(false).await
267    }
268
269    #[tokio::test]
270    #[serial]
271    async fn test_publish_bulk_after_shutdown() {
272        publish_after_shutdown(true).await;
273    }
274
275    #[tokio::test]
276    #[serial]
277    async fn test_publish_immediately() {
278        let topic = create_topic().await;
279        let publisher = topic.new_publisher(None);
280
281        // Publish message.
282        let msgs = ["msg1", "msg2"]
283            .map(|v| PubsubMessage {
284                data: v.into(),
285                ..Default::default()
286            })
287            .to_vec();
288        let ack_ids = publisher.publish_immediately(msgs, None).await.unwrap();
289
290        assert_eq!(2, ack_ids.len());
291
292        let mut publisher = publisher;
293        publisher.shutdown().await;
294        topic.delete(None).await.unwrap();
295    }
296}