spikard_http/grpc/
streaming.rs1use bytes::Bytes;
9use futures_util::Stream;
10use std::pin::Pin;
11use tonic::Status;
12
13pub type MessageStream = Pin<Box<dyn Stream<Item = Result<Bytes, Status>> + Send>>;
20
21pub struct StreamingRequest {
25 pub service_name: String,
27 pub method_name: String,
29 pub message_stream: MessageStream,
31 pub metadata: tonic::metadata::MetadataMap,
33}
34
35pub struct StreamingResponse {
39 pub message_stream: MessageStream,
41 pub metadata: tonic::metadata::MetadataMap,
43}
44
45pub fn message_stream_from_vec(messages: Vec<Bytes>) -> MessageStream {
64 Box::pin(futures_util::stream::iter(messages.into_iter().map(Ok)))
65}
66
67pub fn empty_message_stream() -> MessageStream {
71 Box::pin(futures_util::stream::empty())
72}
73
74pub fn single_message_stream(message: Bytes) -> MessageStream {
87 Box::pin(futures_util::stream::once(async move { Ok(message) }))
88}
89
90pub fn error_stream(status: Status) -> MessageStream {
103 Box::pin(futures_util::stream::once(async move { Err(status) }))
104}
105
106pub fn from_tonic_stream<S>(stream: S) -> MessageStream
111where
112 S: Stream<Item = Result<Bytes, Status>> + Send + 'static,
113{
114 Box::pin(stream)
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use futures_util::StreamExt;
121
122 #[tokio::test]
123 async fn test_message_stream_from_vec() {
124 let messages = vec![Bytes::from("msg1"), Bytes::from("msg2"), Bytes::from("msg3")];
125
126 let mut stream = message_stream_from_vec(messages.clone());
127
128 let msg1 = stream.next().await.unwrap().unwrap();
129 assert_eq!(msg1, Bytes::from("msg1"));
130
131 let msg2 = stream.next().await.unwrap().unwrap();
132 assert_eq!(msg2, Bytes::from("msg2"));
133
134 let msg3 = stream.next().await.unwrap().unwrap();
135 assert_eq!(msg3, Bytes::from("msg3"));
136
137 assert!(stream.next().await.is_none());
138 }
139
140 #[tokio::test]
141 async fn test_empty_message_stream() {
142 let mut stream = empty_message_stream();
143 assert!(stream.next().await.is_none());
144 }
145
146 #[tokio::test]
147 async fn test_single_message_stream() {
148 let mut stream = single_message_stream(Bytes::from("single"));
149
150 let msg = stream.next().await.unwrap().unwrap();
151 assert_eq!(msg, Bytes::from("single"));
152
153 assert!(stream.next().await.is_none());
154 }
155
156 #[tokio::test]
157 async fn test_error_stream() {
158 let mut stream = error_stream(Status::internal("test error"));
159
160 let result = stream.next().await.unwrap();
161 assert!(result.is_err());
162
163 let error = result.unwrap_err();
164 assert_eq!(error.code(), tonic::Code::Internal);
165 assert_eq!(error.message(), "test error");
166
167 assert!(stream.next().await.is_none());
168 }
169
170 #[tokio::test]
171 async fn test_message_stream_from_vec_empty() {
172 let messages: Vec<Bytes> = vec![];
173 let mut stream = message_stream_from_vec(messages);
174 assert!(stream.next().await.is_none());
175 }
176
177 #[tokio::test]
178 async fn test_message_stream_from_vec_large() {
179 let mut messages = vec![];
180 for i in 0..100 {
181 messages.push(Bytes::from(format!("message{}", i)));
182 }
183
184 let mut stream = message_stream_from_vec(messages);
185
186 for i in 0..100 {
187 let msg = stream.next().await.unwrap().unwrap();
188 assert_eq!(msg, Bytes::from(format!("message{}", i)));
189 }
190
191 assert!(stream.next().await.is_none());
192 }
193
194 #[tokio::test]
195 async fn test_from_tonic_stream() {
196 let messages = vec![Ok(Bytes::from("a")), Ok(Bytes::from("b")), Err(Status::cancelled("done"))];
197
198 let tonic_stream = futures_util::stream::iter(messages);
199 let mut stream = from_tonic_stream(tonic_stream);
200
201 let msg1 = stream.next().await.unwrap().unwrap();
202 assert_eq!(msg1, Bytes::from("a"));
203
204 let msg2 = stream.next().await.unwrap().unwrap();
205 assert_eq!(msg2, Bytes::from("b"));
206
207 let result = stream.next().await.unwrap();
208 assert!(result.is_err());
209
210 assert!(stream.next().await.is_none());
211 }
212
213 #[test]
214 fn test_streaming_request_creation() {
215 let stream = empty_message_stream();
216 let request = StreamingRequest {
217 service_name: "test.Service".to_string(),
218 method_name: "StreamMethod".to_string(),
219 message_stream: stream,
220 metadata: tonic::metadata::MetadataMap::new(),
221 };
222
223 assert_eq!(request.service_name, "test.Service");
224 assert_eq!(request.method_name, "StreamMethod");
225 }
226
227 #[test]
228 fn test_streaming_response_creation() {
229 let stream = empty_message_stream();
230 let response = StreamingResponse {
231 message_stream: stream,
232 metadata: tonic::metadata::MetadataMap::new(),
233 };
234
235 assert!(response.metadata.is_empty());
236 }
237}