cloud_pubsub/
topic.rs

1use crate::client::Client;
2use crate::error;
3use crate::subscription::*;
4use crate::EncodedMessage;
5use hyper::body::Buf;
6use hyper::{Method, StatusCode};
7use lazy_static::lazy_static;
8use rand::distributions::Alphanumeric;
9use rand::{thread_rng, Rng};
10use serde::de::DeserializeOwned;
11use serde_derive::{Deserialize, Serialize};
12use std::env;
13
14lazy_static! {
15    static ref PUBSUB_HOST: String = env::var("PUBSUB_EMULATOR_HOST")
16        .map(|host| format!("http://{}", host))
17        .unwrap_or_else(|_| String::from("https://pubsub.googleapis.com"));
18}
19
20#[derive(Deserialize, Serialize)]
21pub struct Topic {
22    pub name: String,
23
24    #[serde(skip)]
25    pub(crate) client: Option<Client>,
26}
27
28#[derive(Deserialize, Debug, Clone)]
29#[serde(rename_all = "camelCase")]
30pub struct PublishMessageResponse {
31    pub message_ids: Vec<String>,
32}
33
34#[derive(Serialize, Clone)]
35pub struct PublishMessageRequest {
36    pub messages: Vec<EncodedMessage>,
37}
38
39impl Topic {
40    pub async fn subscribe(&self) -> Result<Subscription, error::Error> {
41        let client = self.client.clone();
42
43        let new_subscription = Subscription {
44            name: self.new_subscription_name(),
45            topic: Some(self.name.clone()),
46            client: None,
47        };
48
49        let uri: hyper::Uri = format!("{}/v1/{}", *PUBSUB_HOST, new_subscription.name)
50            .parse()
51            .unwrap();
52
53        let mut sub = self
54            .perform_request::<Subscription, Subscription>(uri, Method::PUT, new_subscription)
55            .await?;
56
57        sub.client = client.clone();
58        Ok(sub)
59    }
60
61    pub async fn publish<T: serde::Serialize>(
62        &self,
63        data: T,
64    ) -> Result<PublishMessageResponse, error::Error> {
65        self.publish_message(EncodedMessage::new(&data, None)).await
66    }
67
68    pub async fn publish_message(
69        &self,
70        message: EncodedMessage,
71    ) -> Result<PublishMessageResponse, error::Error> {
72        let uri: hyper::Uri = format!("{}/v1/{}:publish", *PUBSUB_HOST, self.name)
73            .parse()
74            .unwrap();
75
76        let payload = PublishMessageRequest {
77            messages: vec![message],
78        };
79
80        self.perform_request::<PublishMessageRequest, PublishMessageResponse>(
81            uri,
82            Method::POST,
83            payload,
84        )
85        .await
86    }
87
88    async fn perform_request<T: serde::Serialize, U: DeserializeOwned + Clone>(
89        &self,
90        uri: hyper::Uri,
91        method: Method,
92        data: T,
93    ) -> Result<U, error::Error> {
94        let client = self
95            .client
96            .clone()
97            .expect("Topic must be created using a client");
98
99        let json = serde_json::to_string(&data).expect("Failed to serialize request body.");
100        let mut req = client.request(method, json);
101        *req.uri_mut() = uri;
102
103        let response = client.hyper_client().request(req).await?;
104        match response.status() {
105            StatusCode::NOT_FOUND => Err(error::Error::PubSub {
106                code: 404,
107                status: "Topic Not Found".to_string(),
108                message: self.name.clone(),
109            }),
110            StatusCode::OK => {
111                let body = hyper::body::aggregate(response).await?;
112                serde_json::from_reader(body.reader()).map_err(|e| e.into())
113            }
114            code => {
115                let body = hyper::body::aggregate(response).await?;
116                let mut buf = String::new();
117                use std::io::Read;
118                body.reader().read_to_string(&mut buf)?;
119                Err(error::Error::PubSub {
120                    code: code.as_u16() as i32,
121                    status: "Error occurred attempting to subscribe".to_string(),
122                    message: buf,
123                })
124            }
125        }
126    }
127
128    fn new_subscription_name(&self) -> String {
129        let project = self.client.clone().unwrap().project();
130        let slug = thread_rng()
131            .sample_iter(&Alphanumeric)
132            .take(30)
133            .map(char::from)
134            .collect::<String>();
135
136        format!("projects/{}/subscriptions/RST{}", project, slug)
137    }
138}