cloud_pubsub/
subscription.rs1use crate::client::Client;
2use crate::error;
3use crate::message::{FromPubSubMessage, Message};
4use hyper::body::Buf;
5use hyper::{Method, StatusCode};
6use lazy_static::lazy_static;
7use serde_derive::{Deserialize, Serialize};
8use std::env;
9
10lazy_static! {
11 static ref PUBSUB_HOST: String = env::var("PUBSUB_EMULATOR_HOST")
12 .map(|host| format!("http://{}", host))
13 .unwrap_or_else(|_| String::from("https://pubsub.googleapis.com"));
14}
15
16#[derive(Deserialize)]
17struct Response {
18 #[serde(alias = "receivedMessages")]
19 received_messages: Option<Vec<Message>>,
20 error: Option<error::Error>,
21}
22
23#[derive(Serialize)]
24struct AckRequest {
25 #[serde(alias = "ackIds")]
26 ack_ids: Vec<String>,
27}
28
29#[derive(Deserialize, Serialize, Clone)]
30pub struct Subscription {
31 #[serde(skip_serializing)]
32 pub name: String,
33 pub topic: Option<String>,
34
35 #[serde(skip)]
36 pub(crate) client: Option<Client>,
37}
38
39impl Subscription {
40 pub async fn acknowledge_messages(&self, ids: Vec<String>) {
41 let client = self
42 .client
43 .as_ref()
44 .expect("Subscription was not created using a client");
45
46 let uri: hyper::Uri = format!("{}/v1/{}:acknowledge", *PUBSUB_HOST, self.name)
47 .parse()
48 .unwrap();
49
50 let json = serde_json::to_string(&AckRequest { ack_ids: ids }).unwrap();
51
52 let mut req = client.request(Method::POST, json);
53 *req.uri_mut() = uri.clone();
54
55 if let Err(e) = client.hyper_client().request(req).await {
56 log::error!("Failed ACK: {}", e);
57 }
58 }
59
60 pub async fn get_messages<T: FromPubSubMessage>(
61 &self,
62 max_messages: i32,
63 ) -> Result<Vec<(Result<T, error::Error>, String)>, error::Error> {
64 let client = self
65 .client
66 .as_ref()
67 .expect("Subscription was not created using a client");
68
69 let uri: hyper::Uri = format!("{}/v1/{}:pull", *PUBSUB_HOST, self.name)
70 .parse()
71 .unwrap();
72
73 let json = format!("{{\"maxMessages\": {}}}", max_messages);
74
75 let mut req = client.request(Method::POST, json);
76 *req.uri_mut() = uri.clone();
77
78 let response = client.hyper_client().request(req).await?;
79
80 if response.status() == StatusCode::NOT_FOUND {
81 return Err(error::Error::PubSub {
82 code: 404,
83 status: "Subscription Not Found".to_string(),
84 message: self.name.clone(),
85 });
86 }
87 let body = hyper::body::aggregate(response).await?;
88 let response: Response = serde_json::from_reader(body.reader())?;
89 if let Some(e) = response.error {
90 return Err(e);
91 }
92 let messages = response
93 .received_messages
94 .unwrap_or_default()
95 .into_iter()
96 .map(|m| (T::from(m.message), m.ack_id))
97 .collect();
98 Ok(messages)
99 }
100
101 pub async fn destroy(self) -> Result<(), error::Error> {
102 let client = self
103 .client
104 .expect("Subscription was not created using a client");
105
106 let uri: hyper::Uri = format!("{}/v1/{}", *PUBSUB_HOST, self.name)
107 .parse()
108 .unwrap();
109
110 let mut req = client.request(Method::DELETE, "");
111 *req.uri_mut() = uri.clone();
112
113 if let Err(e) = client.hyper_client().request(req).await {
114 Err(e.into())
115 } else {
116 Ok(())
117 }
118 }
119
120 pub fn client(&self) -> &Client {
121 self.client.as_ref().unwrap()
122 }
123}