1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
use serde::Serialize; use surf::http::Method; use crate::presenters::{CreateSubscription, PullSubscription, ReceiveMessages}; use crate::{Client, Error, Message, Topic}; #[derive(Debug)] pub struct Subscription { pub name: String, client: crate::Client, } impl Subscription { pub fn new(client: Client, name: &str) -> Subscription { Self { name: format!("projects/{}/subscriptions/{}", client.project(), name), client, } } pub async fn pull(&self) -> Result<Vec<Message>, Error> { let url = format!("https://pubsub.googleapis.com/v1/{}:pull", &self.name); let payload = PullSubscription::new(); let mut response = self .client .base_request(Method::POST, &url) .body_json(&payload)? .await .unwrap(); if response.status().is_success() { let body_json = response.body_string().await; body_json .map_err(|_| Error::Unexpected("Deserializing".into())) .and_then(|payload| { let result: Result<ReceiveMessages, serde_json::Error> = serde_json::from_str(&payload); result .and_then(|messages| Ok(messages.received_messages)) .map_err(Error::Json) }) } else { response .body_string() .await .map_err(|err| Error::Unexpected(format!("{}", err))) .and_then(|json| Err(Error::PubSub(json))) } } pub async fn create(topic: &Topic) -> Result<Subscription, Error> { let new_subscription_name = format!("s{}", &nanoid::generate(10)); let subscription = Self::new(topic.client.clone(), &new_subscription_name); let url = format!("https://pubsub.googleapis.com/v1/{}", subscription.name); let payload = CreateSubscription::from(&topic); let mut response = subscription .client .base_request(Method::PUT, &url) .body_json(&payload)? .await .unwrap(); if response.status().is_success() { return Ok(subscription); } else { response .body_string() .await .map_err(|err| Error::Unexpected(format!("{}", err))) .and_then(|json| Err(Error::PubSub(json))) } } }