1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use crate::client::Client;
use crate::error;
use crate::subscription::*;
use crate::EncodedMessage;
use hyper::body::Buf;
use hyper::{Method, StatusCode};
use lazy_static::lazy_static;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use serde::de::DeserializeOwned;
use serde_derive::{Deserialize, Serialize};
use std::env;
lazy_static! {
static ref PUBSUB_HOST: String = env::var("PUBSUB_EMULATOR_HOST")
.map(|host| format!("http://{}", host))
.unwrap_or_else(|_| String::from("https://pubsub.googleapis.com"));
}
#[derive(Deserialize, Serialize)]
pub struct Topic {
pub name: String,
#[serde(skip)]
pub(crate) client: Option<Client>,
}
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct PublishMessageResponse {
pub message_ids: Vec<String>,
}
#[derive(Serialize, Clone)]
pub struct PublishMessageRequest {
pub messages: Vec<EncodedMessage>,
}
impl Topic {
pub async fn subscribe(&self) -> Result<Subscription, error::Error> {
let client = self.client.clone();
let new_subscription = Subscription {
name: self.new_subscription_name(),
topic: Some(self.name.clone()),
client: None,
};
let uri: hyper::Uri = format!("{}/v1/{}", *PUBSUB_HOST, new_subscription.name)
.parse()
.unwrap();
let mut sub = self
.perform_request::<Subscription, Subscription>(uri, Method::PUT, new_subscription)
.await?;
sub.client = client.clone();
Ok(sub)
}
pub async fn publish<T: serde::Serialize>(
&self,
data: T,
) -> Result<PublishMessageResponse, error::Error> {
self.publish_message(EncodedMessage::new(&data, None)).await
}
pub async fn publish_message(
&self,
message: EncodedMessage,
) -> Result<PublishMessageResponse, error::Error> {
let uri: hyper::Uri = format!("{}/v1/{}:publish", *PUBSUB_HOST, self.name)
.parse()
.unwrap();
let payload = PublishMessageRequest {
messages: vec![message],
};
self.perform_request::<PublishMessageRequest, PublishMessageResponse>(
uri,
Method::POST,
payload,
)
.await
}
async fn perform_request<T: serde::Serialize, U: DeserializeOwned + Clone>(
&self,
uri: hyper::Uri,
method: Method,
data: T,
) -> Result<U, error::Error> {
let client = self
.client
.clone()
.expect("Topic must be created using a client");
let json = serde_json::to_string(&data).expect("Failed to serialize request body.");
let mut req = client.request(method, json);
*req.uri_mut() = uri;
let response = client.hyper_client().request(req).await?;
match response.status() {
StatusCode::NOT_FOUND => Err(error::Error::PubSub {
code: 404,
status: "Topic Not Found".to_string(),
message: self.name.clone(),
}),
StatusCode::OK => {
let body = hyper::body::aggregate(response).await?;
serde_json::from_reader(body.reader()).map_err(|e| e.into())
}
code => {
let body = hyper::body::aggregate(response).await?;
let mut buf = String::new();
use std::io::Read;
body.reader().read_to_string(&mut buf)?;
Err(error::Error::PubSub {
code: code.as_u16() as i32,
status: "Error occurred attempting to subscribe".to_string(),
message: buf,
})
}
}
}
fn new_subscription_name(&self) -> String {
let project = self.client.clone().unwrap().project();
let slug = thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect::<String>();
format!("projects/{}/subscriptions/RST{}", project, slug)
}
}