1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use crate::client::Client;
use crate::error;
use crate::subscription::*;
use futures::prelude::*;
use hyper::Method;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use serde_derive::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
pub struct Topic {
    pub name: String,

    #[serde(skip)]
    pub(crate) client: Option<Client>,
}

impl Topic {
    pub fn subscribe(&self) -> impl Future<Item = Subscription, Error = error::Error> {
        let client = self
            .client
            .clone()
            .expect("Topic must be created using a client");

        let canonical_name = format!(
            "projects/{}/subscriptions/RST{}",
            client.project(),
            thread_rng()
                .sample_iter(&Alphanumeric)
                .take(30)
                .collect::<String>()
        );
        let uri: hyper::Uri = format!("https://pubsub.googleapis.com/v1/{}", canonical_name)
            .parse()
            .unwrap();

        let json = serde_json::to_string(&Subscription {
            name: canonical_name,
            topic: Some(self.name.clone()),
            client: None,
        })
        .expect("Failed to serialize subscription");

        let mut req = client.request(Method::PUT, json);
        *req.uri_mut() = uri.clone();

        client
            .hyper_client()
            .request(req)
            .and_then(|res| res.into_body().concat2())
            .from_err::<error::Error>()
            .and_then(move |body| {
                let mut sub = serde_json::from_slice::<Subscription>(&body)?;
                sub.client = Some(client.clone());
                Ok(sub)
            })
            .from_err::<error::Error>()
    }
}