google-cloud-pubsub 1.0.0

Google Cloud Client Libraries for Rust - Pub/Sub
Documentation
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::stub::{Stub, TonicStreaming};
use crate::Result;
use crate::generated::gapic_dataplane::stub::dynamic::Subscriber as GapicStub;
pub(super) use crate::generated::gapic_dataplane::transport::Subscriber as Transport;
use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse};
use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult, Streaming};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;

mod info {
    use std::sync::LazyLock;

    const NAME: &str = env!("CARGO_PKG_NAME");
    const VERSION: &str = env!("CARGO_PKG_VERSION");
    pub(super) static X_GOOG_API_CLIENT_HEADER: LazyLock<String> = LazyLock::new(|| {
        let ac = gaxi::api_header::XGoogApiClient {
            name: NAME,
            version: VERSION,
            library_type: gaxi::api_header::GCCL,
        };
        ac.grpc_header_value()
    });
}

impl TonicStreaming for Streaming<StreamingPullResponse> {
    async fn next_message(&mut self) -> TonicResult<Option<StreamingPullResponse>> {
        self.message().await
    }
}

#[async_trait::async_trait]
impl Stub for Transport {
    type Stream = Streaming<StreamingPullResponse>;
    async fn streaming_pull(
        &self,
        request_params: &str,
        request_rx: Receiver<StreamingPullRequest>,
        options: crate::RequestOptions,
    ) -> Result<TonicResponse<Self::Stream>> {
        use gaxi::grpc::tonic::{Extensions, GrpcMethod};
        let request = ReceiverStream::new(request_rx);
        let extensions = {
            let mut e = Extensions::new();
            e.insert(GrpcMethod::new(
                "google.pubsub.v1.Subscriber",
                "StreamingPull",
            ));
            e
        };
        let path =
            http::uri::PathAndQuery::from_static("/google.pubsub.v1.Subscriber/StreamingPull");
        self.inner
            .bidi_stream(
                extensions,
                path,
                request,
                options,
                &info::X_GOOG_API_CLIENT_HEADER,
                request_params,
            )
            .await
    }

    async fn modify_ack_deadline(
        &self,
        req: crate::model::ModifyAckDeadlineRequest,
        options: crate::RequestOptions,
    ) -> Result<crate::Response<()>> {
        GapicStub::modify_ack_deadline(self, req, options).await
    }

    async fn acknowledge(
        &self,
        req: crate::model::AcknowledgeRequest,
        options: crate::RequestOptions,
    ) -> Result<crate::Response<()>> {
        GapicStub::acknowledge(self, req, options).await
    }
}

#[cfg(test)]
pub(super) mod tests {
    use super::*;
    use crate::google::pubsub::v1::ReceivedMessage;
    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
    use pubsub_grpc_mock::google::pubsub::v1;
    use pubsub_grpc_mock::{MockSubscriber, start};

    async fn test_transport(endpoint: String) -> anyhow::Result<Transport> {
        let mut config = gaxi::options::ClientConfig::default();
        config.cred = Some(Anonymous::new().build());
        config.endpoint = Some(endpoint);
        Ok(Transport::new(config).await?)
    }

    // Both crates have their own copies of the protos. We can just serialize
    // then deserialize to convert between the two, as performance is not a
    // concern for these unit tests.
    fn convert(pb: &StreamingPullResponse) -> v1::StreamingPullResponse {
        use prost::Message;
        let v = pb.encode_to_vec();
        v1::StreamingPullResponse::decode(v.as_slice()).expect("encoding is always valid.")
    }

    #[tokio::test]
    async fn streaming_pull() -> anyhow::Result<()> {
        let (response_tx, response_rx) = tokio::sync::mpsc::channel(1);
        let expected = StreamingPullResponse {
            received_messages: vec![ReceivedMessage {
                ack_id: "test-ack-id".to_string(),
                ..Default::default()
            }],
            ..Default::default()
        };
        response_tx.send(Ok(convert(&expected))).await?;

        let (request_tx, request_rx) = tokio::sync::mpsc::channel(1);
        request_tx.send(StreamingPullRequest::default()).await?;

        let mut mock = MockSubscriber::new();
        mock.expect_streaming_pull().return_once(|request| {
            let metadata = request.metadata();
            assert_eq!(
                metadata
                    .get("x-goog-request-params")
                    .expect("routing header missing"),
                "subscription=projects/p/subscriptions/s"
            );
            Ok(TonicResponse::from(response_rx))
        });
        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
        let transport = test_transport(endpoint).await?;
        let mut stream = Stub::streaming_pull(
            &transport,
            "subscription=projects/p/subscriptions/s",
            request_rx,
            crate::RequestOptions::default(),
        )
        .await?
        .into_inner();

        assert_eq!(stream.next_message().await?, Some(expected));

        drop(response_tx);
        assert_eq!(stream.next_message().await?, None);

        Ok(())
    }

    #[tokio::test]
    async fn modify_ack_deadline() -> anyhow::Result<()> {
        let mut mock = MockSubscriber::new();
        mock.expect_modify_ack_deadline()
            .return_once(|_| Ok(TonicResponse::from(())));
        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
        let transport = test_transport(endpoint).await?;
        let _ = Stub::modify_ack_deadline(
            &transport,
            crate::model::ModifyAckDeadlineRequest::new(),
            crate::RequestOptions::default(),
        )
        .await?;
        Ok(())
    }

    #[tokio::test]
    async fn acknowledge() -> anyhow::Result<()> {
        let mut mock = MockSubscriber::new();
        mock.expect_acknowledge()
            .return_once(|_| Ok(TonicResponse::from(())));
        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
        let transport = test_transport(endpoint).await?;
        let _ = Stub::acknowledge(
            &transport,
            crate::model::AcknowledgeRequest::new(),
            crate::RequestOptions::default(),
        )
        .await?;
        Ok(())
    }
}