google_cloud_pubsub/
topic.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use prost_types::DurationError;
5
6use google_cloud_gax::grpc::{Code, Status};
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::pubsub::v1::{
9    DeleteTopicRequest, GetTopicRequest, IngestionDataSourceSettings, ListTopicSubscriptionsRequest,
10    MessageStoragePolicy, SchemaSettings, Topic as InternalTopic,
11};
12
13use crate::apiv1::publisher_client::PublisherClient;
14use crate::apiv1::subscriber_client::SubscriberClient;
15use crate::publisher::{Publisher, PublisherConfig};
16use crate::subscription::Subscription;
17
18#[derive(Debug, Clone)]
19pub struct TopicConfig {
20    pub labels: HashMap<String, String>,
21    pub message_storage_policy: Option<MessageStoragePolicy>,
22    pub kms_key_name: String,
23    pub schema_settings: Option<SchemaSettings>,
24    pub satisfies_pzs: bool,
25    pub message_retention_duration: Option<Duration>,
26    pub ingestion_data_source_settings: Option<IngestionDataSourceSettings>,
27}
28
29impl Default for TopicConfig {
30    fn default() -> Self {
31        Self {
32            labels: HashMap::default(),
33            message_storage_policy: None,
34            kms_key_name: "".to_string(),
35            schema_settings: None,
36            satisfies_pzs: false,
37            message_retention_duration: None,
38            ingestion_data_source_settings: None,
39        }
40    }
41}
42
43/// Topic is a reference to a PubSub topic.
44///
45/// The methods of Topic are safe for use by multiple tasks.
46#[derive(Clone, Debug)]
47pub struct Topic {
48    fqtn: String,
49    pubc: PublisherClient,
50    subc: SubscriberClient,
51}
52
53impl Topic {
54    pub(crate) fn new(fqtn: String, pubc: PublisherClient, subc: SubscriberClient) -> Self {
55        Self { fqtn, pubc, subc }
56    }
57
58    /// id returns the unique identifier of the topic within its project.
59    pub fn id(&self) -> String {
60        self.fqtn
61            .rfind('/')
62            .map_or("".to_string(), |i| self.fqtn[(i + 1)..].to_string())
63    }
64
65    /// fully_qualified_name returns the printable globally unique name for the topic.
66    pub fn fully_qualified_name(&self) -> &str {
67        self.fqtn.as_str()
68    }
69
70    pub fn new_publisher(&self, config: Option<PublisherConfig>) -> Publisher {
71        Publisher::new(self.fqtn.clone(), self.pubc.clone(), config)
72    }
73
74    /// create creates the topic.
75    pub async fn create(&self, cfg: Option<TopicConfig>, retry: Option<RetrySetting>) -> Result<(), Status> {
76        let topic_config = cfg.unwrap_or_default();
77        let req = InternalTopic {
78            name: self.fully_qualified_name().to_string(),
79            labels: topic_config.labels,
80            message_storage_policy: topic_config.message_storage_policy,
81            kms_key_name: topic_config.kms_key_name,
82            schema_settings: topic_config.schema_settings,
83            satisfies_pzs: topic_config.satisfies_pzs,
84            message_retention_duration: topic_config
85                .message_retention_duration
86                .map(Duration::try_into)
87                .transpose()
88                .map_err(|err: DurationError| Status::internal(err.to_string()))?,
89            state: 0,
90            ingestion_data_source_settings: topic_config.ingestion_data_source_settings,
91        };
92        self.pubc.create_topic(req, retry).await.map(|_v| ())
93    }
94
95    /// delete deletes the topic.
96    pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
97        let req = DeleteTopicRequest {
98            topic: self.fqtn.to_string(),
99        };
100        self.pubc.delete_topic(req, retry).await.map(|v| v.into_inner())
101    }
102
103    /// exists reports whether the topic exists on the server.
104    pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
105        if self.fqtn == "_deleted-topic_" {
106            return Ok(false);
107        }
108        let req = GetTopicRequest {
109            topic: self.fqtn.to_string(),
110        };
111        match self.pubc.get_topic(req, retry).await {
112            Ok(_) => Ok(true),
113            Err(e) => {
114                if e.code() == Code::NotFound {
115                    Ok(false)
116                } else {
117                    Err(e)
118                }
119            }
120        }
121    }
122
123    /// Subscriptions returns an iterator which returns the subscriptions for this topic.
124    pub async fn subscriptions(&self, retry: Option<RetrySetting>) -> Result<Vec<Subscription>, Status> {
125        let req = ListTopicSubscriptionsRequest {
126            topic: self.fqtn.to_string(),
127            page_size: 0,
128            page_token: "".to_string(),
129        };
130        self.pubc.list_topic_subscriptions(req, retry).await.map(|v| {
131            v.into_iter()
132                .map(|sub_name| Subscription::new(sub_name, self.subc.clone()))
133                .collect()
134        })
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use std::time::Duration;
141
142    use serial_test::serial;
143    use tokio::task::JoinHandle;
144    use tokio::time::sleep;
145    use uuid::Uuid;
146
147    use google_cloud_gax::conn::{ConnectionOptions, Environment};
148    use google_cloud_gax::grpc::{Code, Status};
149    use google_cloud_googleapis::pubsub::v1::PubsubMessage;
150
151    use crate::apiv1::conn_pool::ConnectionManager;
152    use crate::apiv1::publisher_client::PublisherClient;
153    use crate::apiv1::subscriber_client::SubscriberClient;
154    use crate::publisher::{Publisher, PublisherConfig};
155    use crate::topic::Topic;
156
157    #[ctor::ctor]
158    fn init() {
159        let _ = tracing_subscriber::fmt().try_init();
160    }
161
162    async fn create_topic() -> Topic {
163        let environment = Environment::Emulator("localhost:8681".to_string());
164        let cm1 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
165            .await
166            .unwrap();
167        let pubc = PublisherClient::new(cm1);
168        let cm2 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
169            .await
170            .unwrap();
171        let cm3 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
172            .await
173            .unwrap();
174        let subc = SubscriberClient::new(cm2, cm3);
175
176        let uuid = Uuid::new_v4().hyphenated().to_string();
177        let topic_name = format!("projects/local-project/topics/t{uuid}");
178
179        // Create topic.
180        let topic = Topic::new(topic_name, pubc, subc);
181        if !topic.exists(None).await.unwrap() {
182            topic.create(None, None).await.unwrap();
183        }
184        topic
185    }
186
187    async fn publish(publisher: Publisher) -> Vec<JoinHandle<Result<String, Status>>> {
188        (0..10)
189            .map(|_i| {
190                let publisher = publisher.clone();
191                tokio::spawn(async move {
192                    let msg = PubsubMessage {
193                        data: "abc".into(),
194                        ..Default::default()
195                    };
196                    let awaiter = publisher.publish(msg).await;
197                    awaiter.get().await
198                })
199            })
200            .collect()
201    }
202
203    async fn publish_after_shutdown(bulk: bool) {
204        let topic = create_topic().await;
205        let config = PublisherConfig {
206            flush_interval: Duration::from_secs(10),
207            bundle_size: 11,
208            ..Default::default()
209        };
210        let publisher = topic.new_publisher(Some(config));
211
212        // Publish message.
213        let tasks = publish(publisher.clone()).await;
214
215        // Shutdown after 1 sec
216        sleep(Duration::from_secs(1)).await;
217        let mut publisher = publisher;
218        publisher.shutdown().await;
219
220        // Confirm flush bundle.
221        for task in tasks {
222            let message_id = task.await.unwrap();
223            assert!(message_id.is_ok());
224            assert!(!message_id.unwrap().is_empty());
225        }
226
227        // Can't publish messages
228        let results = if bulk {
229            let m1 = PubsubMessage::default();
230            let m2 = PubsubMessage {
231                ordering_key: "test".to_string(),
232                ..Default::default()
233            };
234            publisher.publish_bulk(vec![m1, m2]).await
235        } else {
236            vec![publisher.publish(PubsubMessage::default()).await]
237        };
238        for result in results {
239            let err = result.get().await.unwrap_err();
240            assert_eq!(Code::Cancelled, err.code());
241            assert_eq!("closed", err.message());
242        }
243
244        topic.delete(None).await.unwrap();
245        assert!(!topic.exists(None).await.unwrap());
246    }
247
248    #[tokio::test]
249    #[serial]
250    async fn test_publish() {
251        let topic = create_topic().await;
252        let publisher = topic.new_publisher(None);
253
254        // Publish message.
255        let tasks = publish(publisher.clone()).await;
256
257        // Wait for all publish task finish
258        for task in tasks {
259            let message_id = task.await.unwrap().unwrap();
260            tracing::trace!("{}", message_id);
261            assert!(!message_id.is_empty())
262        }
263
264        // Wait for publishers in topic finish.
265        let mut publisher = publisher;
266        publisher.shutdown().await;
267
268        // Can't publish messages
269        let result = publisher.publish(PubsubMessage::default()).await.get().await;
270        assert!(result.is_err());
271
272        topic.delete(None).await.unwrap();
273    }
274
275    #[tokio::test]
276    #[serial]
277    async fn test_publish_after_shutdown() {
278        publish_after_shutdown(false).await
279    }
280
281    #[tokio::test]
282    #[serial]
283    async fn test_publish_bulk_after_shutdown() {
284        publish_after_shutdown(true).await;
285    }
286
287    #[tokio::test]
288    #[serial]
289    async fn test_publish_immediately() {
290        let topic = create_topic().await;
291        let publisher = topic.new_publisher(None);
292
293        // Publish message.
294        let msgs = ["msg1", "msg2"]
295            .map(|v| PubsubMessage {
296                data: v.into(),
297                ..Default::default()
298            })
299            .to_vec();
300        let ack_ids = publisher.publish_immediately(msgs, None).await.unwrap();
301
302        assert_eq!(2, ack_ids.len());
303
304        let mut publisher = publisher;
305        publisher.shutdown().await;
306        topic.delete(None).await.unwrap();
307    }
308}