gcp_pubsub/
client.rs

1use crate::Error;
2use crate::Topic;
3use goauth::auth::JwtClaims;
4use goauth::credentials::Credentials;
5use goauth::scopes::Scope;
6use smpl_jwt::Jwt;
7use surf::http::Method;
8use surf::middleware::HttpClient;
9use surf::url::Url;
10use surf::Request;
11
12#[derive(Clone, Debug)]
13pub struct Client {
14  credentials: Credentials,
15  access_token: Option<String>,
16}
17
18impl Client {
19  pub fn new(credentials: Credentials) -> Self {
20    Client {
21      access_token: None,
22      credentials,
23    }
24  }
25
26  pub fn base_request(&self, method: Method, url: &str) -> Request<impl HttpClient> {
27    let parsed_url = Url::parse(url).unwrap();
28    Request::new(method, parsed_url).set_header(
29      "Authorization",
30      format!(
31        "Bearer {}",
32        self
33          .access_token
34          .clone()
35          .unwrap_or("token_doesnt_exist".into())
36      ),
37    )
38  }
39
40  pub fn topic(&self, name: &str) -> Topic {
41    Topic::new(self.clone(), name)
42  }
43
44  pub async fn create_topic(&self, name: &str) -> Result<Topic, Error> {
45    Topic::create(self.clone(), name).await
46  }
47
48  pub fn project(&self) -> String {
49    self.credentials.project()
50  }
51
52  pub fn refresh_token(&mut self) -> Result<(), Error> {
53    match self.get_token() {
54      Ok(token) => {
55        self.access_token = Some(token.access_token().to_owned());
56        Ok(())
57      }
58      Err(e) => Err(Error::from(e)),
59    }
60  }
61
62  fn get_token(&mut self) -> Result<goauth::auth::Token, goauth::error::GOErr> {
63    let claims = JwtClaims::new(
64      self.credentials.iss(),
65      &Scope::PubSub,
66      self.credentials.token_uri(),
67      None,
68      None,
69    );
70    let jwt = Jwt::new(claims, self.credentials.rsa_key().unwrap(), None);
71    goauth::get_token_with_creds(&jwt, &self.credentials)
72  }
73}