pub_sub_client/
lib.rs

1mod error;
2mod publisher;
3mod subscriber;
4
5pub use error::*;
6pub use publisher::*;
7pub use subscriber::*;
8
9use goauth::{auth::JwtClaims, credentials::Credentials, fetcher::TokenFetcher, scopes::Scope};
10use reqwest::Response;
11use serde::Serialize;
12use smpl_jwt::Jwt;
13use std::{
14    env,
15    fmt::{self, Debug, Formatter},
16    time::Duration,
17};
18
19const BASE_URL_ENV_VAR: &str = "PUB_SUB_BASE_URL";
20const DEFAULT_BASE_URL: &str = "https://pubsub.googleapis.com";
21
22pub struct PubSubClient {
23    project_url: String,
24    token_fetcher: TokenFetcher,
25    reqwest_client: reqwest::Client,
26}
27
28impl PubSubClient {
29    pub fn new<T>(key_path: T, refresh_buffer: Duration) -> Result<Self, Error>
30    where
31        T: AsRef<str>,
32    {
33        let key_path = key_path.as_ref();
34        let credentials =
35            Credentials::from_file(key_path).map_err(|source| Error::Initialization {
36                reason: format!("missing or malformed service account key at `{key_path}`"),
37                source: source.into(),
38            })?;
39
40        let base_url = env::var(BASE_URL_ENV_VAR).unwrap_or_else(|_| DEFAULT_BASE_URL.to_string());
41        let project_id = credentials.project();
42        let project_url = format!("{base_url}/v1/projects/{project_id}");
43
44        let jwt = Jwt::new(
45            JwtClaims::new(
46                credentials.iss(),
47                &Scope::PubSub,
48                credentials.token_uri(),
49                None,
50                None,
51            ),
52            credentials
53                .rsa_key()
54                .map_err(|source| Error::Initialization {
55                    reason: format!("malformed private key in service account key at `{key_path}`"),
56                    source: source.into(),
57                })?,
58            None,
59        );
60
61        let refresh_buffer = refresh_buffer
62            .try_into()
63            .map_err(|source| Error::Initialization {
64                reason: format!("invalid refresh_buffer `{refresh_buffer:?}`"),
65                source: Box::new(source),
66            })?;
67
68        Ok(Self {
69            project_url,
70            token_fetcher: TokenFetcher::new(jwt, credentials, refresh_buffer),
71            reqwest_client: reqwest::Client::new(),
72        })
73    }
74
75    async fn send_request<R>(
76        &self,
77        url: &str,
78        request: &R,
79        timeout: Option<Duration>,
80    ) -> Result<Response, Error>
81    where
82        R: Serialize,
83    {
84        let token = self.token_fetcher.fetch_token().await.map_err(Box::new)?;
85
86        let request = self
87            .reqwest_client
88            .post(url)
89            .bearer_auth(token.access_token())
90            .json(request);
91        let request = timeout.into_iter().fold(request, |r, t| r.timeout(t));
92
93        request
94            .send()
95            .await
96            .map_err(Error::HttpServiceCommunication)
97    }
98}
99
100impl Debug for PubSubClient {
101    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
102        f.debug_struct("PubSubClient")
103            .field("project_url", &self.project_url)
104            .finish()
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::{Error, PubSubClient};
111    use serde::Deserialize;
112    use std::time::Duration;
113
114    #[derive(Debug, Deserialize, PartialEq, Eq)]
115    enum Message {
116        Foo { text: String },
117        Bar { text: String },
118    }
119
120    #[test]
121    fn test_new_err_non_existent_key() {
122        let result = PubSubClient::new("non_existent", Duration::from_secs(30));
123        assert!(result.is_err());
124        match result.unwrap_err() {
125            Error::Initialization {
126                reason: _,
127                source: _,
128            } => (),
129            other => panic!("Expected Error::InvalidServiceAccountKey, but was `{other}`"),
130        }
131    }
132
133    #[test]
134    fn test_new_err_invalid_key() {
135        let result = PubSubClient::new("Cargo.toml", Duration::from_secs(30));
136        assert!(result.is_err());
137        match result.unwrap_err() {
138            Error::Initialization {
139                reason: _,
140                source: _,
141            } => (),
142            other => panic!("Expected Error::InvalidServiceAccountKey, but was `{other}`"),
143        }
144    }
145
146    #[test]
147    fn test_new_err_invalid_private_key() {
148        let result = PubSubClient::new("tests/invalid_key.json", Duration::from_secs(30));
149        assert!(result.is_err());
150        match result.unwrap_err() {
151            Error::Initialization {
152                reason: _,
153                source: _,
154            } => (),
155            other => panic!("Expected Error::InvalidPrivateKey, but was `{other}`"),
156        }
157    }
158}