1use crate::client::Client;
2use crate::error;
3use crate::subscription::*;
4use crate::EncodedMessage;
5use hyper::body::Buf;
6use hyper::{Method, StatusCode};
7use lazy_static::lazy_static;
8use rand::distributions::Alphanumeric;
9use rand::{thread_rng, Rng};
10use serde::de::DeserializeOwned;
11use serde_derive::{Deserialize, Serialize};
12use std::env;
13
14lazy_static! {
15 static ref PUBSUB_HOST: String = env::var("PUBSUB_EMULATOR_HOST")
16 .map(|host| format!("http://{}", host))
17 .unwrap_or_else(|_| String::from("https://pubsub.googleapis.com"));
18}
19
20#[derive(Deserialize, Serialize)]
21pub struct Topic {
22 pub name: String,
23
24 #[serde(skip)]
25 pub(crate) client: Option<Client>,
26}
27
28#[derive(Deserialize, Debug, Clone)]
29#[serde(rename_all = "camelCase")]
30pub struct PublishMessageResponse {
31 pub message_ids: Vec<String>,
32}
33
34#[derive(Serialize, Clone)]
35pub struct PublishMessageRequest {
36 pub messages: Vec<EncodedMessage>,
37}
38
39impl Topic {
40 pub async fn subscribe(&self) -> Result<Subscription, error::Error> {
41 let client = self.client.clone();
42
43 let new_subscription = Subscription {
44 name: self.new_subscription_name(),
45 topic: Some(self.name.clone()),
46 client: None,
47 };
48
49 let uri: hyper::Uri = format!("{}/v1/{}", *PUBSUB_HOST, new_subscription.name)
50 .parse()
51 .unwrap();
52
53 let mut sub = self
54 .perform_request::<Subscription, Subscription>(uri, Method::PUT, new_subscription)
55 .await?;
56
57 sub.client = client.clone();
58 Ok(sub)
59 }
60
61 pub async fn publish<T: serde::Serialize>(
62 &self,
63 data: T,
64 ) -> Result<PublishMessageResponse, error::Error> {
65 self.publish_message(EncodedMessage::new(&data, None)).await
66 }
67
68 pub async fn publish_message(
69 &self,
70 message: EncodedMessage,
71 ) -> Result<PublishMessageResponse, error::Error> {
72 let uri: hyper::Uri = format!("{}/v1/{}:publish", *PUBSUB_HOST, self.name)
73 .parse()
74 .unwrap();
75
76 let payload = PublishMessageRequest {
77 messages: vec![message],
78 };
79
80 self.perform_request::<PublishMessageRequest, PublishMessageResponse>(
81 uri,
82 Method::POST,
83 payload,
84 )
85 .await
86 }
87
88 async fn perform_request<T: serde::Serialize, U: DeserializeOwned + Clone>(
89 &self,
90 uri: hyper::Uri,
91 method: Method,
92 data: T,
93 ) -> Result<U, error::Error> {
94 let client = self
95 .client
96 .clone()
97 .expect("Topic must be created using a client");
98
99 let json = serde_json::to_string(&data).expect("Failed to serialize request body.");
100 let mut req = client.request(method, json);
101 *req.uri_mut() = uri;
102
103 let response = client.hyper_client().request(req).await?;
104 match response.status() {
105 StatusCode::NOT_FOUND => Err(error::Error::PubSub {
106 code: 404,
107 status: "Topic Not Found".to_string(),
108 message: self.name.clone(),
109 }),
110 StatusCode::OK => {
111 let body = hyper::body::aggregate(response).await?;
112 serde_json::from_reader(body.reader()).map_err(|e| e.into())
113 }
114 code => {
115 let body = hyper::body::aggregate(response).await?;
116 let mut buf = String::new();
117 use std::io::Read;
118 body.reader().read_to_string(&mut buf)?;
119 Err(error::Error::PubSub {
120 code: code.as_u16() as i32,
121 status: "Error occurred attempting to subscribe".to_string(),
122 message: buf,
123 })
124 }
125 }
126 }
127
128 fn new_subscription_name(&self) -> String {
129 let project = self.client.clone().unwrap().project();
130 let slug = thread_rng()
131 .sample_iter(&Alphanumeric)
132 .take(30)
133 .map(char::from)
134 .collect::<String>();
135
136 format!("projects/{}/subscriptions/RST{}", project, slug)
137 }
138}