spikard_http/grpc/streaming.rs
1//! Streaming support utilities for gRPC
2//!
3//! This module provides utilities for handling streaming RPCs:
4//! - Client streaming (receiving stream of messages)
5//! - Server streaming (sending stream of messages)
6//! - Bidirectional streaming (both directions)
7
8use bytes::Bytes;
9use futures_util::Stream;
10use std::pin::Pin;
11use tonic::Status;
12
13/// Type alias for a stream of protobuf message bytes
14///
15/// Used for both client streaming (incoming) and server streaming (outgoing).
16/// Each item in the stream is either:
17/// - Ok(Bytes): A serialized protobuf message
18/// - Err(Status): A gRPC error
19///
20/// # Backpressure Considerations
21///
22/// Streaming responses should implement backpressure handling to avoid memory buildup with slow clients:
23///
24/// - **Problem**: If a client reads slowly but the handler produces messages quickly, messages will
25/// queue in memory, potentially causing high memory usage or OOM errors.
26/// - **Solution**: The gRPC layer (Tonic) handles backpressure automatically via the underlying TCP/HTTP/2
27/// connection. However, handlers should be aware of this behavior.
28/// - **Best Practice**: For long-running or high-volume streams, implement rate limiting or flow control
29/// in the handler to avoid overwhelming the network buffer.
30///
31/// # Example: Rate-limited streaming
32///
33/// ```ignore
34/// use spikard_http::grpc::streaming::MessageStream;
35/// use bytes::Bytes;
36/// use std::pin::Pin;
37/// use std::time::Duration;
38/// use tokio::time::sleep;
39/// use futures_util::stream::{self, StreamExt};
40///
41/// // Handler that sends 1000 messages with rate limiting
42/// fn create_rate_limited_stream() -> MessageStream {
43/// let messages = (0..1000).map(|i| {
44/// Ok(Bytes::from(format!("message_{}", i)))
45/// });
46///
47/// // Stream with delay between messages to avoid overwhelming the client
48/// let stream = stream::iter(messages)
49/// .then(|msg| async {
50/// sleep(Duration::from_millis(1)).await; // 1ms between messages
51/// msg
52/// });
53///
54/// Box::pin(stream)
55/// }
56/// ```
57///
58/// # Memory Management
59///
60/// Keep the following in mind when implementing large streams:
61///
62/// - Messages are buffered in the gRPC transport layer's internal queue
63/// - Slow clients will cause the queue to grow, increasing memory usage
64/// - Very large individual messages may cause buffer allocation spikes
65/// - Consider implementing stream chunking for very large responses (split one large message into many small ones)
66pub type MessageStream = Pin<Box<dyn Stream<Item = Result<Bytes, Status>> + Send>>;
67
68/// Request for client streaming RPC
69///
70/// Contains metadata and a stream of incoming messages from the client.
71pub struct StreamingRequest {
72 /// Service name
73 pub service_name: String,
74 /// Method name
75 pub method_name: String,
76 /// Stream of incoming protobuf messages
77 pub message_stream: MessageStream,
78 /// Request metadata
79 pub metadata: tonic::metadata::MetadataMap,
80}
81
82/// Response for server streaming RPC
83///
84/// Contains metadata, a stream of outgoing messages, and optional trailers.
85/// Trailers are metadata sent after the stream completes (after all messages).
86pub struct StreamingResponse {
87 /// Stream of outgoing protobuf messages
88 pub message_stream: MessageStream,
89 /// Response metadata (sent before messages)
90 pub metadata: tonic::metadata::MetadataMap,
91 /// Optional trailers (sent after stream completes)
92 ///
93 /// Trailers are useful for sending status information or metrics
94 /// after all messages have been sent.
95 pub trailers: Option<tonic::metadata::MetadataMap>,
96}
97
98/// Helper to create a message stream from a vector of bytes
99///
100/// Useful for testing and for handlers that want to create a stream
101/// from a fixed set of messages.
102///
103/// # Example
104///
105/// ```ignore
106/// use spikard_http::grpc::streaming::message_stream_from_vec;
107/// use bytes::Bytes;
108///
109/// let messages = vec![
110/// Bytes::from("message1"),
111/// Bytes::from("message2"),
112/// ];
113///
114/// let stream = message_stream_from_vec(messages);
115/// ```
116pub fn message_stream_from_vec(messages: Vec<Bytes>) -> MessageStream {
117 Box::pin(futures_util::stream::iter(messages.into_iter().map(Ok)))
118}
119
120/// Helper to create an empty message stream
121///
122/// Useful for testing or for handlers that need to return an empty stream.
123pub fn empty_message_stream() -> MessageStream {
124 Box::pin(futures_util::stream::empty())
125}
126
127/// Helper to create a single-message stream
128///
129/// Useful for converting unary responses to streaming responses.
130///
131/// # Example
132///
133/// ```ignore
134/// use spikard_http::grpc::streaming::single_message_stream;
135/// use bytes::Bytes;
136///
137/// let stream = single_message_stream(Bytes::from("response"));
138/// ```
139pub fn single_message_stream(message: Bytes) -> MessageStream {
140 Box::pin(futures_util::stream::once(async move { Ok(message) }))
141}
142
143/// Helper to create an error stream
144///
145/// Returns a stream that immediately yields a gRPC error.
146///
147/// # Example
148///
149/// ```ignore
150/// use spikard_http::grpc::streaming::error_stream;
151/// use tonic::Status;
152///
153/// let stream = error_stream(Status::internal("Something went wrong"));
154/// ```
155pub fn error_stream(status: Status) -> MessageStream {
156 Box::pin(futures_util::stream::once(async move { Err(status) }))
157}
158
159/// Helper to convert a Tonic ReceiverStream to our MessageStream
160///
161/// This is used in the service bridge to convert Tonic's streaming types
162/// to our internal representation.
163pub fn from_tonic_stream<S>(stream: S) -> MessageStream
164where
165 S: Stream<Item = Result<Bytes, Status>> + Send + 'static,
166{
167 Box::pin(stream)
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use futures_util::StreamExt;
174
175 #[tokio::test]
176 async fn test_message_stream_from_vec() {
177 let messages = vec![Bytes::from("msg1"), Bytes::from("msg2"), Bytes::from("msg3")];
178
179 let mut stream = message_stream_from_vec(messages.clone());
180
181 let msg1 = stream.next().await.unwrap().unwrap();
182 assert_eq!(msg1, Bytes::from("msg1"));
183
184 let msg2 = stream.next().await.unwrap().unwrap();
185 assert_eq!(msg2, Bytes::from("msg2"));
186
187 let msg3 = stream.next().await.unwrap().unwrap();
188 assert_eq!(msg3, Bytes::from("msg3"));
189
190 assert!(stream.next().await.is_none());
191 }
192
193 #[tokio::test]
194 async fn test_empty_message_stream() {
195 let mut stream = empty_message_stream();
196 assert!(stream.next().await.is_none());
197 }
198
199 #[tokio::test]
200 async fn test_single_message_stream() {
201 let mut stream = single_message_stream(Bytes::from("single"));
202
203 let msg = stream.next().await.unwrap().unwrap();
204 assert_eq!(msg, Bytes::from("single"));
205
206 assert!(stream.next().await.is_none());
207 }
208
209 #[tokio::test]
210 async fn test_error_stream() {
211 let mut stream = error_stream(Status::internal("test error"));
212
213 let result = stream.next().await.unwrap();
214 assert!(result.is_err());
215
216 let error = result.unwrap_err();
217 assert_eq!(error.code(), tonic::Code::Internal);
218 assert_eq!(error.message(), "test error");
219
220 assert!(stream.next().await.is_none());
221 }
222
223 #[tokio::test]
224 async fn test_message_stream_from_vec_empty() {
225 let messages: Vec<Bytes> = vec![];
226 let mut stream = message_stream_from_vec(messages);
227 assert!(stream.next().await.is_none());
228 }
229
230 #[tokio::test]
231 async fn test_message_stream_from_vec_large() {
232 let mut messages = vec![];
233 for i in 0..100 {
234 messages.push(Bytes::from(format!("message{}", i)));
235 }
236
237 let mut stream = message_stream_from_vec(messages);
238
239 for i in 0..100 {
240 let msg = stream.next().await.unwrap().unwrap();
241 assert_eq!(msg, Bytes::from(format!("message{}", i)));
242 }
243
244 assert!(stream.next().await.is_none());
245 }
246
247 #[tokio::test]
248 async fn test_from_tonic_stream() {
249 let messages = vec![
250 Ok(Bytes::from("a")),
251 Ok(Bytes::from("b")),
252 Err(Status::cancelled("done")),
253 ];
254
255 let tonic_stream = futures_util::stream::iter(messages);
256 let mut stream = from_tonic_stream(tonic_stream);
257
258 let msg1 = stream.next().await.unwrap().unwrap();
259 assert_eq!(msg1, Bytes::from("a"));
260
261 let msg2 = stream.next().await.unwrap().unwrap();
262 assert_eq!(msg2, Bytes::from("b"));
263
264 let result = stream.next().await.unwrap();
265 assert!(result.is_err());
266
267 assert!(stream.next().await.is_none());
268 }
269
270 #[test]
271 fn test_streaming_request_creation() {
272 let stream = empty_message_stream();
273 let request = StreamingRequest {
274 service_name: "test.Service".to_string(),
275 method_name: "StreamMethod".to_string(),
276 message_stream: stream,
277 metadata: tonic::metadata::MetadataMap::new(),
278 };
279
280 assert_eq!(request.service_name, "test.Service");
281 assert_eq!(request.method_name, "StreamMethod");
282 }
283
284 #[test]
285 fn test_streaming_response_creation() {
286 let stream = empty_message_stream();
287 let response = StreamingResponse {
288 message_stream: stream,
289 metadata: tonic::metadata::MetadataMap::new(),
290 trailers: None,
291 };
292
293 assert!(response.metadata.is_empty());
294 assert!(response.trailers.is_none());
295 }
296
297 #[test]
298 fn test_streaming_response_with_trailers() {
299 let stream = empty_message_stream();
300 let mut trailers = tonic::metadata::MetadataMap::new();
301 trailers.insert(
302 "x-request-id",
303 "test-123"
304 .parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>()
305 .unwrap(),
306 );
307
308 let response = StreamingResponse {
309 message_stream: stream,
310 metadata: tonic::metadata::MetadataMap::new(),
311 trailers: Some(trailers),
312 };
313
314 assert!(response.metadata.is_empty());
315 assert!(response.trailers.is_some());
316 let trailers = response.trailers.unwrap();
317 assert_eq!(trailers.len(), 1);
318 }
319}