cloud_pubsub/
subscription.rs

1use crate::client::Client;
2use crate::error;
3use crate::message::{FromPubSubMessage, Message};
4use hyper::body::Buf;
5use hyper::{Method, StatusCode};
6use lazy_static::lazy_static;
7use serde_derive::{Deserialize, Serialize};
8use std::env;
9
10lazy_static! {
11    static ref PUBSUB_HOST: String = env::var("PUBSUB_EMULATOR_HOST")
12        .map(|host| format!("http://{}", host))
13        .unwrap_or_else(|_| String::from("https://pubsub.googleapis.com"));
14}
15
16#[derive(Deserialize)]
17struct Response {
18    #[serde(alias = "receivedMessages")]
19    received_messages: Option<Vec<Message>>,
20    error: Option<error::Error>,
21}
22
23#[derive(Serialize)]
24struct AckRequest {
25    #[serde(alias = "ackIds")]
26    ack_ids: Vec<String>,
27}
28
29#[derive(Deserialize, Serialize, Clone)]
30pub struct Subscription {
31    #[serde(skip_serializing)]
32    pub name: String,
33    pub topic: Option<String>,
34
35    #[serde(skip)]
36    pub(crate) client: Option<Client>,
37}
38
39impl Subscription {
40    pub async fn acknowledge_messages(&self, ids: Vec<String>) {
41        let client = self
42            .client
43            .as_ref()
44            .expect("Subscription was not created using a client");
45
46        let uri: hyper::Uri = format!("{}/v1/{}:acknowledge", *PUBSUB_HOST, self.name)
47            .parse()
48            .unwrap();
49
50        let json = serde_json::to_string(&AckRequest { ack_ids: ids }).unwrap();
51
52        let mut req = client.request(Method::POST, json);
53        *req.uri_mut() = uri.clone();
54
55        if let Err(e) = client.hyper_client().request(req).await {
56            log::error!("Failed ACK: {}", e);
57        }
58    }
59
60    pub async fn get_messages<T: FromPubSubMessage>(
61        &self,
62        max_messages: i32,
63    ) -> Result<Vec<(Result<T, error::Error>, String)>, error::Error> {
64        let client = self
65            .client
66            .as_ref()
67            .expect("Subscription was not created using a client");
68
69        let uri: hyper::Uri = format!("{}/v1/{}:pull", *PUBSUB_HOST, self.name)
70            .parse()
71            .unwrap();
72
73        let json = format!("{{\"maxMessages\": {}}}", max_messages);
74
75        let mut req = client.request(Method::POST, json);
76        *req.uri_mut() = uri.clone();
77
78        let response = client.hyper_client().request(req).await?;
79
80        if response.status() == StatusCode::NOT_FOUND {
81            return Err(error::Error::PubSub {
82                code: 404,
83                status: "Subscription Not Found".to_string(),
84                message: self.name.clone(),
85            });
86        }
87        let body = hyper::body::aggregate(response).await?;
88        let response: Response = serde_json::from_reader(body.reader())?;
89        if let Some(e) = response.error {
90            return Err(e);
91        }
92        let messages = response
93            .received_messages
94            .unwrap_or_default()
95            .into_iter()
96            .map(|m| (T::from(m.message), m.ack_id))
97            .collect();
98        Ok(messages)
99    }
100
101    pub async fn destroy(self) -> Result<(), error::Error> {
102        let client = self
103            .client
104            .expect("Subscription was not created using a client");
105
106        let uri: hyper::Uri = format!("{}/v1/{}", *PUBSUB_HOST, self.name)
107            .parse()
108            .unwrap();
109
110        let mut req = client.request(Method::DELETE, "");
111        *req.uri_mut() = uri.clone();
112
113        if let Err(e) = client.hyper_client().request(req).await {
114            Err(e.into())
115        } else {
116            Ok(())
117        }
118    }
119
120    pub fn client(&self) -> &Client {
121        self.client.as_ref().unwrap()
122    }
123}