use std::sync::Arc;
use google_cloud_gax::conn::Channel;
use google_cloud_gax::create_request;
use google_cloud_gax::grpc::Response;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
use google_cloud_googleapis::pubsub::v1::publisher_client::PublisherClient as InternalPublisherClient;
use google_cloud_googleapis::pubsub::v1::{
DeleteTopicRequest, DetachSubscriptionRequest, DetachSubscriptionResponse, GetTopicRequest,
ListTopicSnapshotsRequest, ListTopicSubscriptionsRequest, ListTopicsRequest, PublishRequest, PublishResponse,
Topic, UpdateTopicRequest,
};
use crate::apiv1::conn_pool::ConnectionManager;
use crate::apiv1::PUBSUB_MESSAGE_LIMIT;
#[derive(Clone, Debug)]
pub(crate) struct PublisherClient {
cm: Arc<ConnectionManager>,
}
#[allow(dead_code)]
impl PublisherClient {
pub fn new(cm: ConnectionManager) -> PublisherClient {
PublisherClient { cm: Arc::new(cm) }
}
#[inline]
fn client(&self) -> InternalPublisherClient<Channel> {
InternalPublisherClient::new(self.cm.conn())
.max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
.max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
}
#[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
pub async fn create_topic(&self, req: Topic, retry: Option<RetrySetting>) -> Result<Response<Topic>, Status> {
let name = &req.name;
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.create_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
#[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
pub async fn update_topic(
&self,
req: UpdateTopicRequest,
retry: Option<RetrySetting>,
) -> Result<Response<Topic>, Status> {
let name = match &req.topic {
Some(t) => t.name.as_str(),
None => "",
};
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.update_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
#[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
pub async fn publish(
&self,
req: PublishRequest,
retry: Option<RetrySetting>,
) -> Result<Response<PublishResponse>, Status> {
let setting = match retry {
Some(retry) => retry,
None => RetrySetting {
codes: vec![
Code::Unavailable,
Code::Unknown,
Code::Aborted,
Code::Cancelled,
Code::DeadlineExceeded,
Code::ResourceExhausted,
Code::Internal,
],
..Default::default()
},
};
let name = &req.topic;
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.publish(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
#[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
pub async fn get_topic(
&self,
req: GetTopicRequest,
retry: Option<RetrySetting>,
) -> Result<Response<Topic>, Status> {
let topic = &req.topic;
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.get_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
#[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
pub async fn list_topics(
&self,
mut req: ListTopicsRequest,
retry: Option<RetrySetting>,
) -> Result<Vec<Topic>, Status> {
let project = &req.project;
let mut all = vec![];
loop {
let action = || async {
let mut client = self.client();
let request = create_request(format!("project={project}"), req.clone());
client
.list_topics(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.topics.into_iter());
if response.next_page_token.is_empty() {
return Ok(all);
}
req.page_token = response.next_page_token;
}
}
#[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
pub async fn list_topic_subscriptions(
&self,
mut req: ListTopicSubscriptionsRequest,
retry: Option<RetrySetting>,
) -> Result<Vec<String>, Status> {
let topic = &req.topic;
let mut all = vec![];
loop {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client
.list_topic_subscriptions(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.subscriptions.into_iter());
if response.next_page_token.is_empty() {
return Ok(all);
}
req.page_token = response.next_page_token;
}
}
#[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
pub async fn list_topic_snapshots(
&self,
mut req: ListTopicSnapshotsRequest,
retry: Option<RetrySetting>,
) -> Result<Vec<String>, Status> {
let topic = &req.topic;
let mut all = vec![];
loop {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client
.list_topic_snapshots(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.snapshots.into_iter());
if response.next_page_token.is_empty() {
return Ok(all);
}
req.page_token = response.next_page_token;
}
}
#[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
pub async fn delete_topic(
&self,
req: DeleteTopicRequest,
retry: Option<RetrySetting>,
) -> Result<Response<()>, Status> {
let topic = &req.topic;
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.delete_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
#[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
pub async fn detach_subscription(
&self,
req: DetachSubscriptionRequest,
retry: Option<RetrySetting>,
) -> Result<Response<DetachSubscriptionResponse>, Status> {
let subscription = &req.subscription;
let action = || async {
let mut client = self.client();
let request = create_request(format!("subscription={subscription}"), req.clone());
client.detach_subscription(request).await.map_transient_err()
};
invoke(retry, action).await
}
}