#![allow(dead_code, unused)]
use mq_bridge::test_utils::{
run_performance_pipeline_test, setup_logging, PERF_TEST_MESSAGE_COUNT,
};
use std::env;
use std::time::Duration;
const CONFIG_YAML: &str = r#"
routes:
memory_to_grpc:
concurrency: 4
batch_size: 128
input:
memory: { topic: "test-in-grpc" }
output:
grpc:
url: "{grpc_url}"
topic: "grpc_to_memory"
grpc_to_memory:
concurrency: 4
batch_size: 128
input:
grpc:
url: "{grpc_url}"
topic: "grpc_to_memory"
output:
memory: { topic: "test-out-grpc", capacity: {out_capacity} }
"#;
pub async fn test_grpc_performance_pipeline() {
setup_logging();
let timeout = Duration::from_secs(60);
let res = tokio::time::timeout(timeout, test_grpc_client_mode()).await;
assert!(res.is_ok(), "test_grpc_client_mode timed out after 60s");
let res = tokio::time::timeout(timeout, test_grpc_server_mode()).await;
assert!(res.is_ok(), "test_grpc_server_mode timed out after 60s");
}
async fn test_grpc_client_mode() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let grpc_url = format!("http://{}", addr);
use mq_bridge::endpoints::grpc::proto;
use proto::bridge_server::{Bridge, BridgeServer};
use proto::{BridgeMessage, PublishResponse};
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
struct MockBridge {
tx: broadcast::Sender<BridgeMessage>,
}
#[tonic::async_trait]
impl Bridge for MockBridge {
async fn publish(
&self,
request: Request<BridgeMessage>,
) -> Result<Response<PublishResponse>, Status> {
let msg = request.into_inner();
let msg_id = msg.id.clone();
let msg_topic = msg.metadata.get("mq_bridge.topic").cloned();
tracing::debug!(mock = "publish", msg_id = %msg_id, metadata = ?msg.metadata);
let res = self.tx.send(msg);
match res {
Ok(_) => {
tracing::trace!(mock = "publish", msg_id = %msg_id, topic = ?msg_topic, "enqueued to broadcast channel")
}
Err(e) => {
tracing::debug!(mock = "publish", msg_id = %msg_id, error = %format!("{:?}", e), "broadcast send failed")
}
}
Ok(Response::new(PublishResponse {
result: Some(proto::publish_response::Result::Ack(proto::Ack {
id: msg_id,
status: 0,
reason: String::new(),
metadata: Default::default(),
})),
}))
}
async fn acknowledge(
&self,
request: Request<proto::Ack>,
) -> Result<Response<proto::AckResponse>, Status> {
let _ = request.into_inner();
Ok(Response::new(proto::AckResponse {
success: true,
error: String::new(),
}))
}
type PublishBatchStream = ReceiverStream<Result<PublishResponse, Status>>;
async fn publish_batch(
&self,
request: Request<tonic::Streaming<BridgeMessage>>,
) -> Result<Response<Self::PublishBatchStream>, Status> {
let mut stream = request.into_inner();
let (tx, rx) = mpsc::channel(32);
let sender = self.tx.clone();
tokio::spawn(async move {
while let Ok(Some(msg)) = stream.message().await {
let msg_id = msg.id.clone();
let msg_topic = msg.metadata.get("mq_bridge.topic").cloned();
tracing::trace!(mock = "publish_batch", msg_id = %msg_id, metadata = ?msg.metadata);
let res = sender.send(msg);
match res {
Ok(_) => {
tracing::trace!(mock = "publish_batch", msg_id = %msg_id, topic = ?msg_topic, "enqueued to broadcast channel")
}
Err(e) => {
tracing::debug!(mock = "publish_batch", msg_id = %msg_id, error = %format!("{:?}", e), "broadcast send failed")
}
}
let resp = PublishResponse {
result: Some(proto::publish_response::Result::Ack(proto::Ack {
id: msg_id,
status: 0,
reason: String::new(),
metadata: Default::default(),
})),
};
if tx.send(Ok(resp)).await.is_err() {
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
type SubscribeStream = ReceiverStream<Result<BridgeMessage, Status>>;
async fn subscribe(
&self,
request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let req = request.into_inner();
let topic_filter = req.topic;
let mut rx = self.tx.subscribe();
let (tx_stream, rx_stream) = mpsc::channel(10);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(msg) => {
let msg_topic = msg
.metadata
.get("mq_bridge.topic")
.map(|s| s.as_str())
.unwrap_or("");
if msg_topic == topic_filter && tx_stream.send(Ok(msg)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
Ok(Response::new(ReceiverStream::new(rx_stream)))
}
}
use tokio_stream::wrappers::TcpListenerStream;
let (tx, _rx) = broadcast::channel(PERF_TEST_MESSAGE_COUNT + 1000);
let bridge = MockBridge { tx };
let incoming = TcpListenerStream::new(listener);
tracing::info!(mock = "server", grpc_url = %grpc_url, "starting mock gRPC server");
let grpc_url_clone = grpc_url.clone();
let _server_handle = tokio::spawn(async move {
if let Err(e) = tonic::transport::Server::builder()
.serve_with_incoming(BridgeServer::new(bridge), incoming)
.await
{
tracing::error!(mock = "server", grpc_url = %grpc_url_clone, error = %format!("{:?}", e), "mock gRPC server error");
}
});
let config_yaml = CONFIG_YAML.replace("{grpc_url}", &grpc_url).replace(
"{out_capacity}",
&(PERF_TEST_MESSAGE_COUNT + 1000).to_string(),
);
let msg_count = env::var("MQB_FOCUSED_COUNT")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(PERF_TEST_MESSAGE_COUNT);
run_performance_pipeline_test("grpc", &config_yaml, msg_count).await;
}
async fn test_grpc_server_mode() {
use mq_bridge::endpoints::grpc::proto;
use proto::bridge_server::{Bridge, BridgeServer};
use proto::{BridgeMessage, PublishResponse};
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
struct MockBridge {
tx: broadcast::Sender<BridgeMessage>,
}
#[tonic::async_trait]
impl Bridge for MockBridge {
async fn publish(
&self,
request: Request<BridgeMessage>,
) -> Result<Response<PublishResponse>, Status> {
let msg = request.into_inner();
let msg_id = msg.id.clone();
let _ = self.tx.send(msg);
Ok(Response::new(PublishResponse {
result: Some(proto::publish_response::Result::Ack(proto::Ack {
id: msg_id,
status: 0,
reason: String::new(),
metadata: Default::default(),
})),
}))
}
async fn acknowledge(
&self,
request: Request<proto::Ack>,
) -> Result<Response<proto::AckResponse>, Status> {
let _ = request.into_inner();
Ok(Response::new(proto::AckResponse {
success: true,
error: String::new(),
}))
}
type PublishBatchStream = ReceiverStream<Result<PublishResponse, Status>>;
async fn publish_batch(
&self,
request: Request<tonic::Streaming<BridgeMessage>>,
) -> Result<Response<Self::PublishBatchStream>, Status> {
let mut stream = request.into_inner();
let (tx, rx) = mpsc::channel(32);
let sender = self.tx.clone();
tokio::spawn(async move {
while let Ok(Some(msg)) = stream.message().await {
let msg_id = msg.id.clone();
let _ = sender.send(msg);
let resp = PublishResponse {
result: Some(proto::publish_response::Result::Ack(proto::Ack {
id: msg_id,
status: 0,
reason: String::new(),
metadata: Default::default(),
})),
};
if tx.send(Ok(resp)).await.is_err() {
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
type SubscribeStream = ReceiverStream<Result<BridgeMessage, Status>>;
async fn subscribe(
&self,
request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let req = request.into_inner();
let topic_filter = req.topic;
let mut rx = self.tx.subscribe();
let (tx_stream, rx_stream) = mpsc::channel(10);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(msg) => {
let msg_topic = msg
.metadata
.get("mq_bridge.topic")
.map(|s| s.as_str())
.unwrap_or("");
if msg_topic == topic_filter && tx_stream.send(Ok(msg)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
Ok(Response::new(ReceiverStream::new(rx_stream)))
}
}
use tokio_stream::wrappers::TcpListenerStream;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let grpc_url = format!("http://{}", addr);
let (tx, _rx) = broadcast::channel(PERF_TEST_MESSAGE_COUNT + 1000);
let bridge = MockBridge { tx };
let incoming = TcpListenerStream::new(listener);
tracing::info!(mock = "server", grpc_url = %grpc_url, "starting mock gRPC server (server mode)");
let grpc_url_clone = grpc_url.clone();
let _server_handle = tokio::spawn(async move {
if let Err(e) = tonic::transport::Server::builder()
.serve_with_incoming(BridgeServer::new(bridge), incoming)
.await
{
tracing::error!(mock = "server", grpc_url = %grpc_url_clone, error = %format!("{:?}", e), "mock gRPC server error (server mode)");
}
});
let config_yaml = CONFIG_YAML.replace("{grpc_url}", &grpc_url).replace(
"{out_capacity}",
&(PERF_TEST_MESSAGE_COUNT + 1000).to_string(),
);
let msg_count = env::var("MQB_FOCUSED_COUNT")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(PERF_TEST_MESSAGE_COUNT);
run_performance_pipeline_test("grpc", &config_yaml, msg_count).await;
}