gcp_pubsub/
subscription.rs1use serde::Serialize;
2use surf::http::Method;
3
4use crate::presenters::{CreateSubscription, PullSubscription, ReceiveMessages};
5use crate::{Client, Error, Message, Topic};
6
7#[derive(Debug)]
8pub struct Subscription {
9 pub name: String,
10 client: crate::Client,
11}
12
13impl Subscription {
14 pub fn new(client: Client, name: &str) -> Subscription {
15 Self {
16 name: format!("projects/{}/subscriptions/{}", client.project(), name),
17 client,
18 }
19 }
20
21 pub async fn pull(&self) -> Result<Vec<Message>, Error> {
22 let url = format!("https://pubsub.googleapis.com/v1/{}:pull", &self.name);
23 let payload = PullSubscription::new();
24 let mut response = self
25 .client
26 .base_request(Method::POST, &url)
27 .body_json(&payload)?
28 .await
29 .unwrap();
30 if response.status().is_success() {
31 let body_json = response.body_string().await;
32
33 body_json
34 .map_err(|_| Error::Unexpected("Deserializing".into()))
35 .and_then(|payload| {
36 let result: Result<ReceiveMessages, serde_json::Error> = serde_json::from_str(&payload);
37 result
38 .and_then(|messages| Ok(messages.received_messages))
39 .map_err(Error::Json)
40 })
41 } else {
42 response
43 .body_string()
44 .await
45 .map_err(|err| Error::Unexpected(format!("{}", err)))
46 .and_then(|json| Err(Error::PubSub(json)))
47 }
48 }
49
50 pub async fn create(topic: &Topic) -> Result<Subscription, Error> {
51 let new_subscription_name = format!("s{}", &nanoid::generate(10));
52 let subscription = Self::new(topic.client.clone(), &new_subscription_name);
53 let url = format!("https://pubsub.googleapis.com/v1/{}", subscription.name);
54 let payload = CreateSubscription::from(&topic);
55 let mut response = subscription
56 .client
57 .base_request(Method::PUT, &url)
58 .body_json(&payload)?
59 .await
60 .unwrap();
61 if response.status().is_success() {
62 return Ok(subscription);
63 } else {
64 response
65 .body_string()
66 .await
67 .map_err(|err| Error::Unexpected(format!("{}", err)))
68 .and_then(|json| Err(Error::PubSub(json)))
69 }
70 }
71}