spikard_http/grpc/
streaming.rs

1//! Streaming support utilities for gRPC
2//!
3//! This module provides utilities for handling streaming RPCs:
4//! - Client streaming (receiving stream of messages)
5//! - Server streaming (sending stream of messages)
6//! - Bidirectional streaming (both directions)
7
8use bytes::Bytes;
9use futures_util::Stream;
10use std::pin::Pin;
11use tonic::Status;
12
13/// Type alias for a stream of protobuf message bytes
14///
15/// Used for both client streaming (incoming) and server streaming (outgoing).
16/// Each item in the stream is either:
17/// - Ok(Bytes): A serialized protobuf message
18/// - Err(Status): A gRPC error
19pub type MessageStream = Pin<Box<dyn Stream<Item = Result<Bytes, Status>> + Send>>;
20
21/// Request for client streaming RPC
22///
23/// Contains metadata and a stream of incoming messages from the client.
24pub struct StreamingRequest {
25    /// Service name
26    pub service_name: String,
27    /// Method name
28    pub method_name: String,
29    /// Stream of incoming protobuf messages
30    pub message_stream: MessageStream,
31    /// Request metadata
32    pub metadata: tonic::metadata::MetadataMap,
33}
34
35/// Response for server streaming RPC
36///
37/// Contains metadata and a stream of outgoing messages to the client.
38pub struct StreamingResponse {
39    /// Stream of outgoing protobuf messages
40    pub message_stream: MessageStream,
41    /// Response metadata
42    pub metadata: tonic::metadata::MetadataMap,
43}
44
45/// Helper to create a message stream from a vector of bytes
46///
47/// Useful for testing and for handlers that want to create a stream
48/// from a fixed set of messages.
49///
50/// # Example
51///
52/// ```ignore
53/// use spikard_http::grpc::streaming::message_stream_from_vec;
54/// use bytes::Bytes;
55///
56/// let messages = vec![
57///     Bytes::from("message1"),
58///     Bytes::from("message2"),
59/// ];
60///
61/// let stream = message_stream_from_vec(messages);
62/// ```
63pub fn message_stream_from_vec(messages: Vec<Bytes>) -> MessageStream {
64    Box::pin(futures_util::stream::iter(messages.into_iter().map(Ok)))
65}
66
67/// Helper to create an empty message stream
68///
69/// Useful for testing or for handlers that need to return an empty stream.
70pub fn empty_message_stream() -> MessageStream {
71    Box::pin(futures_util::stream::empty())
72}
73
74/// Helper to create a single-message stream
75///
76/// Useful for converting unary responses to streaming responses.
77///
78/// # Example
79///
80/// ```ignore
81/// use spikard_http::grpc::streaming::single_message_stream;
82/// use bytes::Bytes;
83///
84/// let stream = single_message_stream(Bytes::from("response"));
85/// ```
86pub fn single_message_stream(message: Bytes) -> MessageStream {
87    Box::pin(futures_util::stream::once(async move { Ok(message) }))
88}
89
90/// Helper to create an error stream
91///
92/// Returns a stream that immediately yields a gRPC error.
93///
94/// # Example
95///
96/// ```ignore
97/// use spikard_http::grpc::streaming::error_stream;
98/// use tonic::Status;
99///
100/// let stream = error_stream(Status::internal("Something went wrong"));
101/// ```
102pub fn error_stream(status: Status) -> MessageStream {
103    Box::pin(futures_util::stream::once(async move { Err(status) }))
104}
105
106/// Helper to convert a Tonic ReceiverStream to our MessageStream
107///
108/// This is used in the service bridge to convert Tonic's streaming types
109/// to our internal representation.
110pub fn from_tonic_stream<S>(stream: S) -> MessageStream
111where
112    S: Stream<Item = Result<Bytes, Status>> + Send + 'static,
113{
114    Box::pin(stream)
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use futures_util::StreamExt;
121
122    #[tokio::test]
123    async fn test_message_stream_from_vec() {
124        let messages = vec![Bytes::from("msg1"), Bytes::from("msg2"), Bytes::from("msg3")];
125
126        let mut stream = message_stream_from_vec(messages.clone());
127
128        let msg1 = stream.next().await.unwrap().unwrap();
129        assert_eq!(msg1, Bytes::from("msg1"));
130
131        let msg2 = stream.next().await.unwrap().unwrap();
132        assert_eq!(msg2, Bytes::from("msg2"));
133
134        let msg3 = stream.next().await.unwrap().unwrap();
135        assert_eq!(msg3, Bytes::from("msg3"));
136
137        assert!(stream.next().await.is_none());
138    }
139
140    #[tokio::test]
141    async fn test_empty_message_stream() {
142        let mut stream = empty_message_stream();
143        assert!(stream.next().await.is_none());
144    }
145
146    #[tokio::test]
147    async fn test_single_message_stream() {
148        let mut stream = single_message_stream(Bytes::from("single"));
149
150        let msg = stream.next().await.unwrap().unwrap();
151        assert_eq!(msg, Bytes::from("single"));
152
153        assert!(stream.next().await.is_none());
154    }
155
156    #[tokio::test]
157    async fn test_error_stream() {
158        let mut stream = error_stream(Status::internal("test error"));
159
160        let result = stream.next().await.unwrap();
161        assert!(result.is_err());
162
163        let error = result.unwrap_err();
164        assert_eq!(error.code(), tonic::Code::Internal);
165        assert_eq!(error.message(), "test error");
166
167        assert!(stream.next().await.is_none());
168    }
169
170    #[tokio::test]
171    async fn test_message_stream_from_vec_empty() {
172        let messages: Vec<Bytes> = vec![];
173        let mut stream = message_stream_from_vec(messages);
174        assert!(stream.next().await.is_none());
175    }
176
177    #[tokio::test]
178    async fn test_message_stream_from_vec_large() {
179        let mut messages = vec![];
180        for i in 0..100 {
181            messages.push(Bytes::from(format!("message{}", i)));
182        }
183
184        let mut stream = message_stream_from_vec(messages);
185
186        for i in 0..100 {
187            let msg = stream.next().await.unwrap().unwrap();
188            assert_eq!(msg, Bytes::from(format!("message{}", i)));
189        }
190
191        assert!(stream.next().await.is_none());
192    }
193
194    #[tokio::test]
195    async fn test_from_tonic_stream() {
196        let messages = vec![
197            Ok(Bytes::from("a")),
198            Ok(Bytes::from("b")),
199            Err(Status::cancelled("done")),
200        ];
201
202        let tonic_stream = futures_util::stream::iter(messages);
203        let mut stream = from_tonic_stream(tonic_stream);
204
205        let msg1 = stream.next().await.unwrap().unwrap();
206        assert_eq!(msg1, Bytes::from("a"));
207
208        let msg2 = stream.next().await.unwrap().unwrap();
209        assert_eq!(msg2, Bytes::from("b"));
210
211        let result = stream.next().await.unwrap();
212        assert!(result.is_err());
213
214        assert!(stream.next().await.is_none());
215    }
216
217    #[test]
218    fn test_streaming_request_creation() {
219        let stream = empty_message_stream();
220        let request = StreamingRequest {
221            service_name: "test.Service".to_string(),
222            method_name: "StreamMethod".to_string(),
223            message_stream: stream,
224            metadata: tonic::metadata::MetadataMap::new(),
225        };
226
227        assert_eq!(request.service_name, "test.Service");
228        assert_eq!(request.method_name, "StreamMethod");
229    }
230
231    #[test]
232    fn test_streaming_response_creation() {
233        let stream = empty_message_stream();
234        let response = StreamingResponse {
235            message_stream: stream,
236            metadata: tonic::metadata::MetadataMap::new(),
237        };
238
239        assert!(response.metadata.is_empty());
240    }
241}