gcloud-pubsub 1.5.1

Google Cloud Platform pubsub client library.
Documentation
use std::sync::Arc;

use google_cloud_gax::conn::Channel;
use google_cloud_gax::create_request;
use google_cloud_gax::grpc::Response;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
use google_cloud_googleapis::pubsub::v1::publisher_client::PublisherClient as InternalPublisherClient;
use google_cloud_googleapis::pubsub::v1::{
    DeleteTopicRequest, DetachSubscriptionRequest, DetachSubscriptionResponse, GetTopicRequest,
    ListTopicSnapshotsRequest, ListTopicSubscriptionsRequest, ListTopicsRequest, PublishRequest, PublishResponse,
    Topic, UpdateTopicRequest,
};

use crate::apiv1::conn_pool::ConnectionManager;
use crate::apiv1::PUBSUB_MESSAGE_LIMIT;

#[derive(Clone, Debug)]
pub(crate) struct PublisherClient {
    cm: Arc<ConnectionManager>,
}

#[allow(dead_code)]
impl PublisherClient {
    /// create new publisher client
    pub fn new(cm: ConnectionManager) -> PublisherClient {
        PublisherClient { cm: Arc::new(cm) }
    }

    #[inline]
    fn client(&self) -> InternalPublisherClient<Channel> {
        InternalPublisherClient::new(self.cm.conn())
            .max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
            .max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
    }

    /// create_topic creates the given topic with the given name. See the [resource name rules]
    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
    pub async fn create_topic(&self, req: Topic, retry: Option<RetrySetting>) -> Result<Response<Topic>, Status> {
        let name = &req.name;
        let action = || async {
            let mut client = self.client();
            let request = create_request(format!("name={name}"), req.clone());
            client.create_topic(request).await.map_transient_err()
        };
        invoke(retry, action).await
    }

    /// update_topic updates an existing topic. Note that certain properties of a
    /// topic are not modifiable.
    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
    pub async fn update_topic(
        &self,
        req: UpdateTopicRequest,
        retry: Option<RetrySetting>,
    ) -> Result<Response<Topic>, Status> {
        let name = match &req.topic {
            Some(t) => t.name.as_str(),
            None => "",
        };
        let action = || async {
            let mut client = self.client();
            let request = create_request(format!("name={name}"), req.clone());
            client.update_topic(request).await.map_transient_err()
        };
        invoke(retry, action).await
    }

    /// publish adds one or more messages to the topic. Returns NOT_FOUND if the topic does not exist.
    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
    pub async fn publish(
        &self,
        req: PublishRequest,
        retry: Option<RetrySetting>,
    ) -> Result<Response<PublishResponse>, Status> {
        let setting = match retry {
            Some(retry) => retry,
            None => RetrySetting {
                codes: vec![
                    Code::Unavailable,
                    Code::Unknown,
                    Code::Aborted,
                    Code::Cancelled,
                    Code::DeadlineExceeded,
                    Code::ResourceExhausted,
                    Code::Internal,
                ],
                ..Default::default()
            },
        };
        let name = &req.topic;
        let action = || async {
            let mut client = self.client();
            let request = create_request(format!("name={name}"), req.clone());
            client.publish(request).await.map_transient_err()
        };
        invoke(Some(setting), action).await
    }

    /// get_topic gets the configuration of a topic.
    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
    pub async fn get_topic(
        &self,
        req: GetTopicRequest,
        retry: Option<RetrySetting>,
    ) -> Result<Response<Topic>, Status> {
        let topic = &req.topic;
        let action = || async {
            let mut client = self.client();
            let request = create_request(format!("topic={topic}"), req.clone());
            client.get_topic(request).await.map_transient_err()
        };
        invoke(retry, action).await
    }

    /// list_topics lists matching topics.
    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
    pub async fn list_topics(
        &self,
        mut req: ListTopicsRequest,
        retry: Option<RetrySetting>,
    ) -> Result<Vec<Topic>, Status> {
        let project = &req.project;
        let mut all = vec![];
        //eager loading
        loop {
            let action = || async {
                let mut client = self.client();
                let request = create_request(format!("project={project}"), req.clone());
                client
                    .list_topics(request)
                    .await
                    .map(|d| d.into_inner())
                    .map_transient_err()
            };
            let response = invoke(retry.clone(), action).await?;
            all.extend(response.topics.into_iter());
            if response.next_page_token.is_empty() {
                return Ok(all);
            }
            req.page_token = response.next_page_token;
        }
    }

    /// list_topics lists matching topics.
    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
    pub async fn list_topic_subscriptions(
        &self,
        mut req: ListTopicSubscriptionsRequest,
        retry: Option<RetrySetting>,
    ) -> Result<Vec<String>, Status> {
        let topic = &req.topic;
        let mut all = vec![];
        //eager loading
        loop {
            let action = || async {
                let mut client = self.client();
                let request = create_request(format!("topic={topic}"), req.clone());
                client
                    .list_topic_subscriptions(request)
                    .await
                    .map(|d| d.into_inner())
                    .map_transient_err()
            };
            let response = invoke(retry.clone(), action).await?;
            all.extend(response.subscriptions.into_iter());
            if response.next_page_token.is_empty() {
                return Ok(all);
            }
            req.page_token = response.next_page_token;
        }
    }

    /// list_topic_snapshots lists the names of the snapshots on this topic. Snapshots are used in
    /// Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations,
    /// which allow you to manage message acknowledgments in bulk. That is, you can
    /// set the acknowledgment state of messages in an existing subscription to the
    /// state captured by a snapshot.
    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
    pub async fn list_topic_snapshots(
        &self,
        mut req: ListTopicSnapshotsRequest,
        retry: Option<RetrySetting>,
    ) -> Result<Vec<String>, Status> {
        let topic = &req.topic;
        let mut all = vec![];
        //eager loading
        loop {
            let action = || async {
                let mut client = self.client();
                let request = create_request(format!("topic={topic}"), req.clone());
                client
                    .list_topic_snapshots(request)
                    .await
                    .map(|d| d.into_inner())
                    .map_transient_err()
            };
            let response = invoke(retry.clone(), action).await?;
            all.extend(response.snapshots.into_iter());
            if response.next_page_token.is_empty() {
                return Ok(all);
            }
            req.page_token = response.next_page_token;
        }
    }

    /// delete_topic deletes the topic with the given name. Returns NOT_FOUND if the topic
    /// does not exist. After a topic is deleted, a new topic may be created with
    /// the same name; this is an entirely new topic with none of the old
    /// configuration or subscriptions. Existing subscriptions to this topic are
    /// not deleted, but their topic field is set to _deleted-topic_.
    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
    pub async fn delete_topic(
        &self,
        req: DeleteTopicRequest,
        retry: Option<RetrySetting>,
    ) -> Result<Response<()>, Status> {
        let topic = &req.topic;
        let action = || async {
            let mut client = self.client();
            let request = create_request(format!("topic={topic}"), req.clone());
            client.delete_topic(request).await.map_transient_err()
        };
        invoke(retry, action).await
    }

    /// detach_subscription detaches a subscription from this topic. All messages retained in the
    /// subscription are dropped. Subsequent Pull and StreamingPull requests
    /// will return FAILED_PRECONDITION. If the subscription is a push
    /// subscription, pushes to the endpoint will stop.
    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
    pub async fn detach_subscription(
        &self,
        req: DetachSubscriptionRequest,
        retry: Option<RetrySetting>,
    ) -> Result<Response<DetachSubscriptionResponse>, Status> {
        let subscription = &req.subscription;
        let action = || async {
            let mut client = self.client();
            let request = create_request(format!("subscription={subscription}"), req.clone());
            client.detach_subscription(request).await.map_transient_err()
        };
        invoke(retry, action).await
    }
}