use super::stub::{Stub, TonicStreaming};
use crate::Result;
use crate::generated::gapic_dataplane::stub::dynamic::Subscriber as GapicStub;
pub(super) use crate::generated::gapic_dataplane::transport::Subscriber as Transport;
use crate::google::pubsub::v1::{StreamingPullRequest, StreamingPullResponse};
use gaxi::grpc::tonic::{Response as TonicResponse, Result as TonicResult, Streaming};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;
mod info {
use std::sync::LazyLock;
const NAME: &str = env!("CARGO_PKG_NAME");
const VERSION: &str = env!("CARGO_PKG_VERSION");
pub(super) static X_GOOG_API_CLIENT_HEADER: LazyLock<String> = LazyLock::new(|| {
let ac = gaxi::api_header::XGoogApiClient {
name: NAME,
version: VERSION,
library_type: gaxi::api_header::GCCL,
};
ac.grpc_header_value()
});
}
impl TonicStreaming for Streaming<StreamingPullResponse> {
async fn next_message(&mut self) -> TonicResult<Option<StreamingPullResponse>> {
self.message().await
}
}
#[async_trait::async_trait]
impl Stub for Transport {
type Stream = Streaming<StreamingPullResponse>;
async fn streaming_pull(
&self,
request_params: &str,
request_rx: Receiver<StreamingPullRequest>,
options: crate::RequestOptions,
) -> Result<TonicResponse<Self::Stream>> {
use gaxi::grpc::tonic::{Extensions, GrpcMethod};
let request = ReceiverStream::new(request_rx);
let extensions = {
let mut e = Extensions::new();
e.insert(GrpcMethod::new(
"google.pubsub.v1.Subscriber",
"StreamingPull",
));
e
};
let path =
http::uri::PathAndQuery::from_static("/google.pubsub.v1.Subscriber/StreamingPull");
self.inner
.bidi_stream(
extensions,
path,
request,
options,
&info::X_GOOG_API_CLIENT_HEADER,
request_params,
)
.await
}
async fn modify_ack_deadline(
&self,
req: crate::model::ModifyAckDeadlineRequest,
options: crate::RequestOptions,
) -> Result<crate::Response<()>> {
GapicStub::modify_ack_deadline(self, req, options).await
}
async fn acknowledge(
&self,
req: crate::model::AcknowledgeRequest,
options: crate::RequestOptions,
) -> Result<crate::Response<()>> {
GapicStub::acknowledge(self, req, options).await
}
}
#[cfg(test)]
pub(super) mod tests {
use super::*;
use crate::google::pubsub::v1::ReceivedMessage;
use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
use pubsub_grpc_mock::google::pubsub::v1;
use pubsub_grpc_mock::{MockSubscriber, start};
async fn test_transport(endpoint: String) -> anyhow::Result<Transport> {
let mut config = gaxi::options::ClientConfig::default();
config.cred = Some(Anonymous::new().build());
config.endpoint = Some(endpoint);
Ok(Transport::new(config).await?)
}
fn convert(pb: &StreamingPullResponse) -> v1::StreamingPullResponse {
use prost::Message;
let v = pb.encode_to_vec();
v1::StreamingPullResponse::decode(v.as_slice()).expect("encoding is always valid.")
}
#[tokio::test]
async fn streaming_pull() -> anyhow::Result<()> {
let (response_tx, response_rx) = tokio::sync::mpsc::channel(1);
let expected = StreamingPullResponse {
received_messages: vec![ReceivedMessage {
ack_id: "test-ack-id".to_string(),
..Default::default()
}],
..Default::default()
};
response_tx.send(Ok(convert(&expected))).await?;
let (request_tx, request_rx) = tokio::sync::mpsc::channel(1);
request_tx.send(StreamingPullRequest::default()).await?;
let mut mock = MockSubscriber::new();
mock.expect_streaming_pull().return_once(|request| {
let metadata = request.metadata();
assert_eq!(
metadata
.get("x-goog-request-params")
.expect("routing header missing"),
"subscription=projects/p/subscriptions/s"
);
Ok(TonicResponse::from(response_rx))
});
let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
let transport = test_transport(endpoint).await?;
let mut stream = Stub::streaming_pull(
&transport,
"subscription=projects/p/subscriptions/s",
request_rx,
crate::RequestOptions::default(),
)
.await?
.into_inner();
assert_eq!(stream.next_message().await?, Some(expected));
drop(response_tx);
assert_eq!(stream.next_message().await?, None);
Ok(())
}
#[tokio::test]
async fn modify_ack_deadline() -> anyhow::Result<()> {
let mut mock = MockSubscriber::new();
mock.expect_modify_ack_deadline()
.return_once(|_| Ok(TonicResponse::from(())));
let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
let transport = test_transport(endpoint).await?;
let _ = Stub::modify_ack_deadline(
&transport,
crate::model::ModifyAckDeadlineRequest::new(),
crate::RequestOptions::default(),
)
.await?;
Ok(())
}
#[tokio::test]
async fn acknowledge() -> anyhow::Result<()> {
let mut mock = MockSubscriber::new();
mock.expect_acknowledge()
.return_once(|_| Ok(TonicResponse::from(())));
let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
let transport = test_transport(endpoint).await?;
let _ = Stub::acknowledge(
&transport,
crate::model::AcknowledgeRequest::new(),
crate::RequestOptions::default(),
)
.await?;
Ok(())
}
}