use crate::Result;
use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse};
use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult};
use tokio::sync::mpsc::Receiver;
pub(super) trait TonicStreaming: std::fmt::Debug + Send + 'static {
fn next_message(
&mut self,
) -> impl Future<Output = TonicResult<Option<StreamingPullResponse>>> + Send;
}
#[async_trait::async_trait]
pub(super) trait Stub: std::fmt::Debug + Send + Sync {
type Stream: Sized + std::fmt::Debug;
async fn streaming_pull(
&self,
request_params: &str,
request_rx: Receiver<StreamingPullRequest>,
_options: crate::RequestOptions,
) -> Result<TonicResponse<Self::Stream>>;
async fn modify_ack_deadline(
&self,
_req: crate::model::ModifyAckDeadlineRequest,
_options: crate::RequestOptions,
) -> Result<crate::Response<()>>;
async fn acknowledge(
&self,
_req: crate::model::AcknowledgeRequest,
_options: crate::RequestOptions,
) -> Result<crate::Response<()>>;
}
#[cfg(test)]
pub(super) mod tests {
use super::*;
use tokio::sync::mpsc::Receiver;
type MockStream = Receiver<TonicResult<StreamingPullResponse>>;
impl TonicStreaming for MockStream {
async fn next_message(&mut self) -> TonicResult<Option<StreamingPullResponse>> {
self.recv().await.transpose()
}
}
mockall::mock! {
#[derive(Debug)]
pub(in super::super) Stub {}
#[async_trait::async_trait]
impl Stub for Stub {
type Stream = MockStream;
async fn streaming_pull(
&self,
request_params: &str,
request_rx: Receiver<StreamingPullRequest>,
_options: crate::RequestOptions,
) -> Result<TonicResponse<MockStream>>;
async fn modify_ack_deadline(&self,
_req: crate::model::ModifyAckDeadlineRequest,
_options: crate::RequestOptions,
) -> Result<crate::Response<()>>;
async fn acknowledge(
&self,
_req: crate::model::AcknowledgeRequest,
_options: crate::RequestOptions,
) -> Result<crate::Response<()>>;
}
}
}