gcp_pubsub/
subscription.rs

1use serde::Serialize;
2use surf::http::Method;
3
4use crate::presenters::{CreateSubscription, PullSubscription, ReceiveMessages};
5use crate::{Client, Error, Message, Topic};
6
7#[derive(Debug)]
8pub struct Subscription {
9  pub name: String,
10  client: crate::Client,
11}
12
13impl Subscription {
14  pub fn new(client: Client, name: &str) -> Subscription {
15    Self {
16      name: format!("projects/{}/subscriptions/{}", client.project(), name),
17      client,
18    }
19  }
20
21  pub async fn pull(&self) -> Result<Vec<Message>, Error> {
22    let url = format!("https://pubsub.googleapis.com/v1/{}:pull", &self.name);
23    let payload = PullSubscription::new();
24    let mut response = self
25      .client
26      .base_request(Method::POST, &url)
27      .body_json(&payload)?
28      .await
29      .unwrap();
30    if response.status().is_success() {
31      let body_json = response.body_string().await;
32
33      body_json
34        .map_err(|_| Error::Unexpected("Deserializing".into()))
35        .and_then(|payload| {
36          let result: Result<ReceiveMessages, serde_json::Error> = serde_json::from_str(&payload);
37          result
38            .and_then(|messages| Ok(messages.received_messages))
39            .map_err(Error::Json)
40        })
41    } else {
42      response
43        .body_string()
44        .await
45        .map_err(|err| Error::Unexpected(format!("{}", err)))
46        .and_then(|json| Err(Error::PubSub(json)))
47    }
48  }
49
50  pub async fn create(topic: &Topic) -> Result<Subscription, Error> {
51    let new_subscription_name = format!("s{}", &nanoid::generate(10));
52    let subscription = Self::new(topic.client.clone(), &new_subscription_name);
53    let url = format!("https://pubsub.googleapis.com/v1/{}", subscription.name);
54    let payload = CreateSubscription::from(&topic);
55    let mut response = subscription
56      .client
57      .base_request(Method::PUT, &url)
58      .body_json(&payload)?
59      .await
60      .unwrap();
61    if response.status().is_success() {
62      return Ok(subscription);
63    } else {
64      response
65        .body_string()
66        .await
67        .map_err(|err| Error::Unexpected(format!("{}", err)))
68        .and_then(|json| Err(Error::PubSub(json)))
69    }
70  }
71}