google-cloud-pubsub 1.1.0

Google Cloud Client Libraries for Rust - Pub/Sub
Documentation
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::Result;
use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse};
use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
use tokio::sync::mpsc::Receiver;

pub(super) trait TonicStreaming: std::fmt::Debug + Send + 'static {
    fn next_message(
        &mut self,
    ) -> impl Future<Output = TonicResult<Option<StreamingPullResponse>>> + Send;
}

/// An internal trait for mocking the transport layer.
#[async_trait::async_trait]
pub(super) trait Stub: std::fmt::Debug + Send + Sync {
    type Stream: Sized + std::fmt::Debug;
    async fn streaming_pull(
        &self,
        request_params: &str,
        request_rx: Receiver<StreamingPullRequest>,
        _options: crate::RequestOptions,
    ) -> Result<TonicResponse<Self::Stream>>;

    async fn modify_ack_deadline(
        &self,
        _req: crate::model::ModifyAckDeadlineRequest,
        _options: crate::RequestOptions,
    ) -> Result<crate::Response<()>>;

    async fn acknowledge(
        &self,
        _req: crate::model::AcknowledgeRequest,
        _options: crate::RequestOptions,
    ) -> Result<crate::Response<()>>;
}

#[cfg(test)]
pub(super) mod tests {
    use super::*;
    use tokio::sync::mpsc::Receiver;

    type MockStream = Receiver<TonicResult<StreamingPullResponse>>;

    // Allow us to mock a tonic stream in our unit tests, using an mpsc receiver
    impl TonicStreaming for MockStream {
        async fn next_message(&mut self) -> TonicResult<Option<StreamingPullResponse>> {
            self.recv().await.transpose()
        }
    }

    mockall::mock! {
        #[derive(Debug)]
        pub(in super::super) Stub {}
        #[async_trait::async_trait]
        impl Stub for Stub {
            type Stream = MockStream;
            async fn streaming_pull(
                &self,
                request_params: &str,
                request_rx: Receiver<StreamingPullRequest>,
                _options: crate::RequestOptions,
            ) -> Result<TonicResponse<MockStream>>;
            async fn modify_ack_deadline(&self,
                _req: crate::model::ModifyAckDeadlineRequest,
                _options: crate::RequestOptions,
            ) -> Result<crate::Response<()>>;
            async fn acknowledge(
                &self,
                _req: crate::model::AcknowledgeRequest,
                _options: crate::RequestOptions,
            ) -> Result<crate::Response<()>>;
        }
    }
}