use crate::api::{publisher_client::PublisherClient, subscriber_client::SubscriberClient};
use crate::certs::get_pem;
use crate::error::Error;
use crate::subscription::Subscription;
use crate::topic::Topic;
use google_auth::TokenManager;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::{IntoRequest, Request};
#[derive(Clone)]
pub struct Client {
pub(crate) project_id: String,
pub(crate) publisher: PublisherClient<Channel>,
pub(crate) subscriber: SubscriberClient<Channel>,
pub(crate) token_manager: TokenManager,
}
impl Client {
pub(crate) const DOMAIN_NAME: &'static str = "pubsub.googleapis.com";
pub(crate) const SCOPES: [&'static str; 2] = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/pubsub",
];
pub async fn new(project_id: &str) -> Result<Self, Error> {
let pem = get_pem();
let channel = match std::env::var("PUBSUB_EMULATOR_HOST") {
Ok(host) => {
let (host, port) = host.split_once(":").unwrap();
let uri = tonic::transport::Uri::builder()
.scheme("http")
.authority(format!("{}:{}", host, port).as_str())
.path_and_query("")
.build()
.unwrap();
let channel = Channel::builder(uri).connect().await?;
channel
}
Err(_) => {
let uri = tonic::transport::Uri::builder()
.scheme("https")
.authority(Self::DOMAIN_NAME)
.path_and_query("")
.build()
.unwrap();
let tls_config = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(&pem))
.domain_name(uri.host().unwrap());
let channel = Channel::builder(uri)
.tls_config(tls_config)?
.connect()
.await?;
channel
}
};
let subscriber = SubscriberClient::new(channel.clone());
let publisher = PublisherClient::new(channel);
let token_manager = TokenManager::new(&Self::SCOPES).await?;
Ok(Self {
project_id: project_id.to_string(),
subscriber,
publisher,
token_manager,
})
}
pub(crate) async fn insert_authz_token<T: IntoRequest<T>>(
&mut self,
request: T,
) -> Result<Request<T>, Error> {
let mut request = request.into_request();
let token = self.token_manager.token().await?;
let metadata = request.metadata_mut();
metadata.insert("authorization", token.parse()?);
Ok(request)
}
pub fn subscription(&self, subscription_name: &str) -> Subscription {
let name = format!(
"projects/{}/subscriptions/{}",
&self.project_id, subscription_name
);
Subscription::new(self.clone(), name)
}
pub fn topic(&self, topic_name: &str) -> Topic {
let name = format!("projects/{}/topics/{}", self.project_id, topic_name);
Topic::new(self.clone(), name)
}
}