1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::client::Client;
use crate::error;
use crate::message::{FromPubSubMessage, Message};
use bytes::buf::BufExt as _;
use hyper::{Method, StatusCode};
use serde_derive::{Deserialize, Serialize};

#[derive(Deserialize)]
struct Response {
    #[serde(alias = "receivedMessages")]
    received_messages: Option<Vec<Message>>,
    error: Option<error::Error>,
}

#[derive(Serialize)]
struct AckRequest {
    #[serde(alias = "ackIds")]
    ack_ids: Vec<String>,
}

#[derive(Deserialize, Serialize, Clone)]
pub struct Subscription {
    #[serde(skip_serializing)]
    pub name: String,
    pub topic: Option<String>,

    #[serde(skip)]
    pub(crate) client: Option<Client>,
}

impl Subscription {
    pub async fn acknowledge_messages(&self, ids: Vec<String>) {
        let client = self
            .client
            .as_ref()
            .expect("Subscription was not created using a client");

        let uri: hyper::Uri = format!("https://pubsub.googleapis.com/v1/{}:acknowledge", self.name)
            .parse()
            .unwrap();

        let json = serde_json::to_string(&AckRequest { ack_ids: ids }).unwrap();

        let mut req = client.request(Method::POST, json);
        *req.uri_mut() = uri.clone();

        if let Err(e) = client.hyper_client().request(req).await {
            eprintln!("Failed ACk: {}", e);
        }
    }

    pub async fn get_messages<T: FromPubSubMessage>(
        &self,
    ) -> Result<(Vec<T>, Vec<String>), error::Error> {
        let client = self
            .client
            .as_ref()
            .expect("Subscription was not created using a client");

        let uri: hyper::Uri = format!("https://pubsub.googleapis.com/v1/{}:pull", self.name)
            .parse()
            .unwrap();

        let json = r#"{"maxMessages": 100}"#;

        let mut req = client.request(Method::POST, json);
        *req.uri_mut() = uri.clone();

        let response = client.hyper_client().request(req).await?;
        if response.status() == StatusCode::NOT_FOUND {
            return Err(error::Error::PubSub {
                code: 404,
                status: "Subscription Not Found".to_string(),
                message: self.name.clone(),
            });
        }
        let body = hyper::body::aggregate(response).await?;
        let response: Response = serde_json::from_reader(body.reader())?;
        if let Some(e) = response.error {
            return Err(e);
        }
        let messages = response.received_messages.unwrap_or_default();
        let ack_ids: Vec<String> = messages
            .as_slice()
            .iter()
            .map(|packet| packet.ack_id.clone())
            .collect();
        let packets = messages
            .into_iter()
            .filter_map(|packet| match T::from(packet.message) {
                Ok(o) => Some(o),
                Err(e) => {
                    eprintln!("Failed converting pubsub {}", e,);
                    None
                }
            })
            .collect();

        Ok((packets, ack_ids))
    }

    pub async fn destroy(self) -> Result<(), error::Error> {
        let client = self
            .client
            .expect("Subscription was not created using a client");

        let uri: hyper::Uri = format!("https://pubsub.googleapis.com/v1/{}", self.name)
            .parse()
            .unwrap();

        let mut req = client.request(Method::DELETE, "");
        *req.uri_mut() = uri.clone();

        if let Err(e) = client.hyper_client().request(req).await {
            Err(e.into())
        } else {
            Ok(())
        }
    }

    pub fn client(&self) -> &Client {
        self.client.as_ref().unwrap()
    }
}