1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
use crate::client::Client; use crate::error; use crate::subscription::*; use futures::prelude::*; use hyper::Method; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use serde_derive::{Deserialize, Serialize}; #[derive(Deserialize, Serialize)] pub struct Topic { pub name: String, #[serde(skip)] pub(crate) client: Option<Client>, } impl Topic { pub fn subscribe(&self) -> impl Future<Item = Subscription, Error = error::Error> { let client = self .client .clone() .expect("Topic must be created using a client"); let canonical_name = format!( "projects/{}/subscriptions/RST{}", client.project(), thread_rng() .sample_iter(&Alphanumeric) .take(30) .collect::<String>() ); let uri: hyper::Uri = format!("https://pubsub.googleapis.com/v1/{}", canonical_name) .parse() .unwrap(); let json = serde_json::to_string(&Subscription { name: canonical_name, topic: Some(self.name.clone()), client: None, }) .expect("Failed to serialize subscription"); let mut req = client.request(Method::PUT, json); *req.uri_mut() = uri.clone(); client .hyper_client() .request(req) .and_then(|res| res.into_body().concat2()) .from_err::<error::Error>() .and_then(move |body| { let mut sub = serde_json::from_slice::<Subscription>(&body)?; sub.client = Some(client.clone()); Ok(sub) }) .from_err::<error::Error>() } }