use bytes::Bytes;
use futures_util::Stream;
use std::pin::Pin;
use tonic::Status;
pub type MessageStream = Pin<Box<dyn Stream<Item = Result<Bytes, Status>> + Send>>;
pub struct StreamingRequest {
pub service_name: String,
pub method_name: String,
pub message_stream: MessageStream,
pub metadata: tonic::metadata::MetadataMap,
}
pub struct StreamingResponse {
pub message_stream: MessageStream,
pub metadata: tonic::metadata::MetadataMap,
pub trailers: Option<tonic::metadata::MetadataMap>,
}
pub fn message_stream_from_vec(messages: Vec<Bytes>) -> MessageStream {
Box::pin(futures_util::stream::iter(messages.into_iter().map(Ok)))
}
pub fn empty_message_stream() -> MessageStream {
Box::pin(futures_util::stream::empty())
}
pub fn single_message_stream(message: Bytes) -> MessageStream {
Box::pin(futures_util::stream::once(async move { Ok(message) }))
}
pub fn error_stream(status: Status) -> MessageStream {
Box::pin(futures_util::stream::once(async move { Err(status) }))
}
pub fn from_tonic_stream<S>(stream: S) -> MessageStream
where
S: Stream<Item = Result<Bytes, Status>> + Send + 'static,
{
Box::pin(stream)
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::StreamExt;
#[tokio::test]
async fn test_message_stream_from_vec() {
let messages = vec![Bytes::from("msg1"), Bytes::from("msg2"), Bytes::from("msg3")];
let mut stream = message_stream_from_vec(messages.clone());
let msg1 = stream.next().await.unwrap().unwrap();
assert_eq!(msg1, Bytes::from("msg1"));
let msg2 = stream.next().await.unwrap().unwrap();
assert_eq!(msg2, Bytes::from("msg2"));
let msg3 = stream.next().await.unwrap().unwrap();
assert_eq!(msg3, Bytes::from("msg3"));
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_empty_message_stream() {
let mut stream = empty_message_stream();
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_single_message_stream() {
let mut stream = single_message_stream(Bytes::from("single"));
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(msg, Bytes::from("single"));
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_error_stream() {
let mut stream = error_stream(Status::internal("test error"));
let result = stream.next().await.unwrap();
assert!(result.is_err());
let error = result.unwrap_err();
assert_eq!(error.code(), tonic::Code::Internal);
assert_eq!(error.message(), "test error");
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_message_stream_from_vec_empty() {
let messages: Vec<Bytes> = vec![];
let mut stream = message_stream_from_vec(messages);
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_message_stream_from_vec_large() {
let mut messages = vec![];
for i in 0..100 {
messages.push(Bytes::from(format!("message{}", i)));
}
let mut stream = message_stream_from_vec(messages);
for i in 0..100 {
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(msg, Bytes::from(format!("message{}", i)));
}
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_from_tonic_stream() {
let messages = vec![
Ok(Bytes::from("a")),
Ok(Bytes::from("b")),
Err(Status::cancelled("done")),
];
let tonic_stream = futures_util::stream::iter(messages);
let mut stream = from_tonic_stream(tonic_stream);
let msg1 = stream.next().await.unwrap().unwrap();
assert_eq!(msg1, Bytes::from("a"));
let msg2 = stream.next().await.unwrap().unwrap();
assert_eq!(msg2, Bytes::from("b"));
let result = stream.next().await.unwrap();
assert!(result.is_err());
assert!(stream.next().await.is_none());
}
#[test]
fn test_streaming_request_creation() {
let stream = empty_message_stream();
let request = StreamingRequest {
service_name: "test.Service".to_string(),
method_name: "StreamMethod".to_string(),
message_stream: stream,
metadata: tonic::metadata::MetadataMap::new(),
};
assert_eq!(request.service_name, "test.Service");
assert_eq!(request.method_name, "StreamMethod");
}
#[test]
fn test_streaming_response_creation() {
let stream = empty_message_stream();
let response = StreamingResponse {
message_stream: stream,
metadata: tonic::metadata::MetadataMap::new(),
trailers: None,
};
assert!(response.metadata.is_empty());
assert!(response.trailers.is_none());
}
#[test]
fn test_streaming_response_with_trailers() {
let stream = empty_message_stream();
let mut trailers = tonic::metadata::MetadataMap::new();
trailers.insert(
"x-request-id",
"test-123"
.parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>()
.unwrap(),
);
let response = StreamingResponse {
message_stream: stream,
metadata: tonic::metadata::MetadataMap::new(),
trailers: Some(trailers),
};
assert!(response.metadata.is_empty());
assert!(response.trailers.is_some());
let trailers = response.trailers.unwrap();
assert_eq!(trailers.len(), 1);
}
}