1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use serde::Serialize;
use surf::http::Method;

use crate::presenters::{CreateSubscription, PullSubscription, ReceiveMessages};
use crate::{Client, Error, Message, Topic};

#[derive(Debug)]
pub struct Subscription {
  pub name: String,
  client: crate::Client,
}

impl Subscription {
  pub fn new(client: Client, name: &str) -> Subscription {
    Self {
      name: format!("projects/{}/subscriptions/{}", client.project(), name),
      client,
    }
  }

  pub async fn pull(&self) -> Result<Vec<Message>, Error> {
    let url = format!("https://pubsub.googleapis.com/v1/{}:pull", &self.name);
    let payload = PullSubscription::new();
    let mut response = self
      .client
      .base_request(Method::POST, &url)
      .body_json(&payload)?
      .await
      .unwrap();
    if response.status().is_success() {
      let body_json = response.body_string().await;

      body_json
        .map_err(|_| Error::Unexpected("Deserializing".into()))
        .and_then(|payload| {
          let result: Result<ReceiveMessages, serde_json::Error> = serde_json::from_str(&payload);
          result
            .and_then(|messages| Ok(messages.received_messages))
            .map_err(Error::Json)
        })
    } else {
      response
        .body_string()
        .await
        .map_err(|err| Error::Unexpected(format!("{}", err)))
        .and_then(|json| Err(Error::PubSub(json)))
    }
  }

  pub async fn create(topic: &Topic) -> Result<Subscription, Error> {
    let new_subscription_name = format!("s{}", &nanoid::generate(10));
    let subscription = Self::new(topic.client.clone(), &new_subscription_name);
    let url = format!("https://pubsub.googleapis.com/v1/{}", subscription.name);
    let payload = CreateSubscription::from(&topic);
    let mut response = subscription
      .client
      .base_request(Method::PUT, &url)
      .body_json(&payload)?
      .await
      .unwrap();
    if response.status().is_success() {
      return Ok(subscription);
    } else {
      response
        .body_string()
        .await
        .map_err(|err| Error::Unexpected(format!("{}", err)))
        .and_then(|json| Err(Error::PubSub(json)))
    }
  }
}