spikard_http/grpc/
streaming.rs

1//! Streaming support utilities for gRPC
2//!
3//! This module provides utilities for handling streaming RPCs:
4//! - Client streaming (receiving stream of messages)
5//! - Server streaming (sending stream of messages)
6//! - Bidirectional streaming (both directions)
7
8use bytes::Bytes;
9use futures_util::Stream;
10use std::pin::Pin;
11use tonic::Status;
12
13/// Type alias for a stream of protobuf message bytes
14///
15/// Used for both client streaming (incoming) and server streaming (outgoing).
16/// Each item in the stream is either:
17/// - Ok(Bytes): A serialized protobuf message
18/// - Err(Status): A gRPC error
19///
20/// # Backpressure Considerations
21///
22/// Streaming responses should implement backpressure handling to avoid memory buildup with slow clients:
23///
24/// - **Problem**: If a client reads slowly but the handler produces messages quickly, messages will
25///   queue in memory, potentially causing high memory usage or OOM errors.
26/// - **Solution**: The gRPC layer (Tonic) handles backpressure automatically via the underlying TCP/HTTP/2
27///   connection. However, handlers should be aware of this behavior.
28/// - **Best Practice**: For long-running or high-volume streams, implement rate limiting or flow control
29///   in the handler to avoid overwhelming the network buffer.
30///
31/// # Example: Rate-limited streaming
32///
33/// ```ignore
34/// use spikard_http::grpc::streaming::MessageStream;
35/// use bytes::Bytes;
36/// use std::pin::Pin;
37/// use std::time::Duration;
38/// use tokio::time::sleep;
39/// use futures_util::stream::{self, StreamExt};
40///
41/// // Handler that sends 1000 messages with rate limiting
42/// fn create_rate_limited_stream() -> MessageStream {
43///     let messages = (0..1000).map(|i| {
44///         Ok(Bytes::from(format!("message_{}", i)))
45///     });
46///
47///     // Stream with delay between messages to avoid overwhelming the client
48///     let stream = stream::iter(messages)
49///         .then(|msg| async {
50///             sleep(Duration::from_millis(1)).await;  // 1ms between messages
51///             msg
52///         });
53///
54///     Box::pin(stream)
55/// }
56/// ```
57///
58/// # Memory Management
59///
60/// Keep the following in mind when implementing large streams:
61///
62/// - Messages are buffered in the gRPC transport layer's internal queue
63/// - Slow clients will cause the queue to grow, increasing memory usage
64/// - Very large individual messages may cause buffer allocation spikes
65/// - Consider implementing stream chunking for very large responses (split one large message into many small ones)
66pub type MessageStream = Pin<Box<dyn Stream<Item = Result<Bytes, Status>> + Send>>;
67
68/// Request for client streaming RPC
69///
70/// Contains metadata and a stream of incoming messages from the client.
71pub struct StreamingRequest {
72    /// Service name
73    pub service_name: String,
74    /// Method name
75    pub method_name: String,
76    /// Stream of incoming protobuf messages
77    pub message_stream: MessageStream,
78    /// Request metadata
79    pub metadata: tonic::metadata::MetadataMap,
80}
81
82/// Response for server streaming RPC
83///
84/// Contains metadata, a stream of outgoing messages, and optional trailers.
85/// Trailers are metadata sent after the stream completes (after all messages).
86pub struct StreamingResponse {
87    /// Stream of outgoing protobuf messages
88    pub message_stream: MessageStream,
89    /// Response metadata (sent before messages)
90    pub metadata: tonic::metadata::MetadataMap,
91    /// Optional trailers (sent after stream completes)
92    ///
93    /// Trailers are useful for sending status information or metrics
94    /// after all messages have been sent.
95    pub trailers: Option<tonic::metadata::MetadataMap>,
96}
97
98/// Helper to create a message stream from a vector of bytes
99///
100/// Useful for testing and for handlers that want to create a stream
101/// from a fixed set of messages.
102///
103/// # Example
104///
105/// ```ignore
106/// use spikard_http::grpc::streaming::message_stream_from_vec;
107/// use bytes::Bytes;
108///
109/// let messages = vec![
110///     Bytes::from("message1"),
111///     Bytes::from("message2"),
112/// ];
113///
114/// let stream = message_stream_from_vec(messages);
115/// ```
116pub fn message_stream_from_vec(messages: Vec<Bytes>) -> MessageStream {
117    Box::pin(futures_util::stream::iter(messages.into_iter().map(Ok)))
118}
119
120/// Helper to create an empty message stream
121///
122/// Useful for testing or for handlers that need to return an empty stream.
123pub fn empty_message_stream() -> MessageStream {
124    Box::pin(futures_util::stream::empty())
125}
126
127/// Helper to create a single-message stream
128///
129/// Useful for converting unary responses to streaming responses.
130///
131/// # Example
132///
133/// ```ignore
134/// use spikard_http::grpc::streaming::single_message_stream;
135/// use bytes::Bytes;
136///
137/// let stream = single_message_stream(Bytes::from("response"));
138/// ```
139pub fn single_message_stream(message: Bytes) -> MessageStream {
140    Box::pin(futures_util::stream::once(async move { Ok(message) }))
141}
142
143/// Helper to create an error stream
144///
145/// Returns a stream that immediately yields a gRPC error.
146///
147/// # Example
148///
149/// ```ignore
150/// use spikard_http::grpc::streaming::error_stream;
151/// use tonic::Status;
152///
153/// let stream = error_stream(Status::internal("Something went wrong"));
154/// ```
155pub fn error_stream(status: Status) -> MessageStream {
156    Box::pin(futures_util::stream::once(async move { Err(status) }))
157}
158
159/// Helper to convert a Tonic ReceiverStream to our MessageStream
160///
161/// This is used in the service bridge to convert Tonic's streaming types
162/// to our internal representation.
163pub fn from_tonic_stream<S>(stream: S) -> MessageStream
164where
165    S: Stream<Item = Result<Bytes, Status>> + Send + 'static,
166{
167    Box::pin(stream)
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use futures_util::StreamExt;
174
175    #[tokio::test]
176    async fn test_message_stream_from_vec() {
177        let messages = vec![Bytes::from("msg1"), Bytes::from("msg2"), Bytes::from("msg3")];
178
179        let mut stream = message_stream_from_vec(messages.clone());
180
181        let msg1 = stream.next().await.unwrap().unwrap();
182        assert_eq!(msg1, Bytes::from("msg1"));
183
184        let msg2 = stream.next().await.unwrap().unwrap();
185        assert_eq!(msg2, Bytes::from("msg2"));
186
187        let msg3 = stream.next().await.unwrap().unwrap();
188        assert_eq!(msg3, Bytes::from("msg3"));
189
190        assert!(stream.next().await.is_none());
191    }
192
193    #[tokio::test]
194    async fn test_empty_message_stream() {
195        let mut stream = empty_message_stream();
196        assert!(stream.next().await.is_none());
197    }
198
199    #[tokio::test]
200    async fn test_single_message_stream() {
201        let mut stream = single_message_stream(Bytes::from("single"));
202
203        let msg = stream.next().await.unwrap().unwrap();
204        assert_eq!(msg, Bytes::from("single"));
205
206        assert!(stream.next().await.is_none());
207    }
208
209    #[tokio::test]
210    async fn test_error_stream() {
211        let mut stream = error_stream(Status::internal("test error"));
212
213        let result = stream.next().await.unwrap();
214        assert!(result.is_err());
215
216        let error = result.unwrap_err();
217        assert_eq!(error.code(), tonic::Code::Internal);
218        assert_eq!(error.message(), "test error");
219
220        assert!(stream.next().await.is_none());
221    }
222
223    #[tokio::test]
224    async fn test_message_stream_from_vec_empty() {
225        let messages: Vec<Bytes> = vec![];
226        let mut stream = message_stream_from_vec(messages);
227        assert!(stream.next().await.is_none());
228    }
229
230    #[tokio::test]
231    async fn test_message_stream_from_vec_large() {
232        let mut messages = vec![];
233        for i in 0..100 {
234            messages.push(Bytes::from(format!("message{}", i)));
235        }
236
237        let mut stream = message_stream_from_vec(messages);
238
239        for i in 0..100 {
240            let msg = stream.next().await.unwrap().unwrap();
241            assert_eq!(msg, Bytes::from(format!("message{}", i)));
242        }
243
244        assert!(stream.next().await.is_none());
245    }
246
247    #[tokio::test]
248    async fn test_from_tonic_stream() {
249        let messages = vec![
250            Ok(Bytes::from("a")),
251            Ok(Bytes::from("b")),
252            Err(Status::cancelled("done")),
253        ];
254
255        let tonic_stream = futures_util::stream::iter(messages);
256        let mut stream = from_tonic_stream(tonic_stream);
257
258        let msg1 = stream.next().await.unwrap().unwrap();
259        assert_eq!(msg1, Bytes::from("a"));
260
261        let msg2 = stream.next().await.unwrap().unwrap();
262        assert_eq!(msg2, Bytes::from("b"));
263
264        let result = stream.next().await.unwrap();
265        assert!(result.is_err());
266
267        assert!(stream.next().await.is_none());
268    }
269
270    #[test]
271    fn test_streaming_request_creation() {
272        let stream = empty_message_stream();
273        let request = StreamingRequest {
274            service_name: "test.Service".to_string(),
275            method_name: "StreamMethod".to_string(),
276            message_stream: stream,
277            metadata: tonic::metadata::MetadataMap::new(),
278        };
279
280        assert_eq!(request.service_name, "test.Service");
281        assert_eq!(request.method_name, "StreamMethod");
282    }
283
284    #[test]
285    fn test_streaming_response_creation() {
286        let stream = empty_message_stream();
287        let response = StreamingResponse {
288            message_stream: stream,
289            metadata: tonic::metadata::MetadataMap::new(),
290            trailers: None,
291        };
292
293        assert!(response.metadata.is_empty());
294        assert!(response.trailers.is_none());
295    }
296
297    #[test]
298    fn test_streaming_response_with_trailers() {
299        let stream = empty_message_stream();
300        let mut trailers = tonic::metadata::MetadataMap::new();
301        trailers.insert(
302            "x-request-id",
303            "test-123"
304                .parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>()
305                .unwrap(),
306        );
307
308        let response = StreamingResponse {
309            message_stream: stream,
310            metadata: tonic::metadata::MetadataMap::new(),
311            trailers: Some(trailers),
312        };
313
314        assert!(response.metadata.is_empty());
315        assert!(response.trailers.is_some());
316        let trailers = response.trailers.unwrap();
317        assert_eq!(trailers.len(), 1);
318    }
319}