1mod error;
2mod publisher;
3mod subscriber;
4
5pub use error::*;
6pub use publisher::*;
7pub use subscriber::*;
8
9use goauth::{auth::JwtClaims, credentials::Credentials, fetcher::TokenFetcher, scopes::Scope};
10use reqwest::Response;
11use serde::Serialize;
12use smpl_jwt::Jwt;
13use std::{
14 env,
15 fmt::{self, Debug, Formatter},
16 time::Duration,
17};
18
19const BASE_URL_ENV_VAR: &str = "PUB_SUB_BASE_URL";
20const DEFAULT_BASE_URL: &str = "https://pubsub.googleapis.com";
21
22pub struct PubSubClient {
23 project_url: String,
24 token_fetcher: TokenFetcher,
25 reqwest_client: reqwest::Client,
26}
27
28impl PubSubClient {
29 pub fn new<T>(key_path: T, refresh_buffer: Duration) -> Result<Self, Error>
30 where
31 T: AsRef<str>,
32 {
33 let key_path = key_path.as_ref();
34 let credentials =
35 Credentials::from_file(key_path).map_err(|source| Error::Initialization {
36 reason: format!("missing or malformed service account key at `{key_path}`"),
37 source: source.into(),
38 })?;
39
40 let base_url = env::var(BASE_URL_ENV_VAR).unwrap_or_else(|_| DEFAULT_BASE_URL.to_string());
41 let project_id = credentials.project();
42 let project_url = format!("{base_url}/v1/projects/{project_id}");
43
44 let jwt = Jwt::new(
45 JwtClaims::new(
46 credentials.iss(),
47 &Scope::PubSub,
48 credentials.token_uri(),
49 None,
50 None,
51 ),
52 credentials
53 .rsa_key()
54 .map_err(|source| Error::Initialization {
55 reason: format!("malformed private key in service account key at `{key_path}`"),
56 source: source.into(),
57 })?,
58 None,
59 );
60
61 let refresh_buffer = refresh_buffer
62 .try_into()
63 .map_err(|source| Error::Initialization {
64 reason: format!("invalid refresh_buffer `{refresh_buffer:?}`"),
65 source: Box::new(source),
66 })?;
67
68 Ok(Self {
69 project_url,
70 token_fetcher: TokenFetcher::new(jwt, credentials, refresh_buffer),
71 reqwest_client: reqwest::Client::new(),
72 })
73 }
74
75 async fn send_request<R>(
76 &self,
77 url: &str,
78 request: &R,
79 timeout: Option<Duration>,
80 ) -> Result<Response, Error>
81 where
82 R: Serialize,
83 {
84 let token = self.token_fetcher.fetch_token().await.map_err(Box::new)?;
85
86 let request = self
87 .reqwest_client
88 .post(url)
89 .bearer_auth(token.access_token())
90 .json(request);
91 let request = timeout.into_iter().fold(request, |r, t| r.timeout(t));
92
93 request
94 .send()
95 .await
96 .map_err(Error::HttpServiceCommunication)
97 }
98}
99
100impl Debug for PubSubClient {
101 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
102 f.debug_struct("PubSubClient")
103 .field("project_url", &self.project_url)
104 .finish()
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::{Error, PubSubClient};
111 use serde::Deserialize;
112 use std::time::Duration;
113
114 #[derive(Debug, Deserialize, PartialEq, Eq)]
115 enum Message {
116 Foo { text: String },
117 Bar { text: String },
118 }
119
120 #[test]
121 fn test_new_err_non_existent_key() {
122 let result = PubSubClient::new("non_existent", Duration::from_secs(30));
123 assert!(result.is_err());
124 match result.unwrap_err() {
125 Error::Initialization {
126 reason: _,
127 source: _,
128 } => (),
129 other => panic!("Expected Error::InvalidServiceAccountKey, but was `{other}`"),
130 }
131 }
132
133 #[test]
134 fn test_new_err_invalid_key() {
135 let result = PubSubClient::new("Cargo.toml", Duration::from_secs(30));
136 assert!(result.is_err());
137 match result.unwrap_err() {
138 Error::Initialization {
139 reason: _,
140 source: _,
141 } => (),
142 other => panic!("Expected Error::InvalidServiceAccountKey, but was `{other}`"),
143 }
144 }
145
146 #[test]
147 fn test_new_err_invalid_private_key() {
148 let result = PubSubClient::new("tests/invalid_key.json", Duration::from_secs(30));
149 assert!(result.is_err());
150 match result.unwrap_err() {
151 Error::Initialization {
152 reason: _,
153 source: _,
154 } => (),
155 other => panic!("Expected Error::InvalidPrivateKey, but was `{other}`"),
156 }
157 }
158}