Skip to main content

spikard_http/grpc/
framing.rs

1//! HTTP/2 gRPC frame parsing for client streaming support
2//!
3//! This module provides parsing of gRPC messages from HTTP/2 request bodies.
4//! gRPC frames are structured according to RFC 9109 (gRPC over HTTP/2):
5//!
6//! ```text
7//! +----------+----------+-+-+-+-+-+-+-+-+
8//! |Compression|          Length        |
9//! | Flags (1) |        (4 bytes)       |
10//! +----------+----------+-+-+-+-+-+-+-+-+-+-+
11//! |                                     |
12//! |      Serialized Message (N bytes)  |
13//! |                                     |
14//! +-------------------------------------+
15//! ```
16//!
17//! The compression flag indicates whether the message is compressed (1 = compressed, 0 = uncompressed).
18//! The length is encoded as a big-endian u32, indicating the size of the message bytes.
19//!
20//! # Protocol Details
21//!
22//! - **Compression Flag**: 1 byte, value 0 or 1
23//! - **Message Length**: 4 bytes, big-endian u32, maximum 4GB
24//! - **Message Data**: N bytes, where N is the length from the header
25//!
26//! # Stream Processing
27//!
28//! The parser processes the HTTP/2 body stream by:
29//! 1. Reading the 5-byte frame header (compression flag + length)
30//! 2. Parsing the length as big-endian u32
31//! 3. Validating the length against `max_message_size`
32//! 4. Reading the message bytes
33//! 5. Yielding the message
34//! 6. Repeating until the body is exhausted
35//!
36//! # Error Handling
37//!
38//! The parser returns gRPC status codes according to RFC 9110:
39//! - `INTERNAL`: Protocol parsing errors (incomplete frames, read errors)
40//! - `RESOURCE_EXHAUSTED`: Message size exceeds limit
41//! - `UNIMPLEMENTED`: Compression requested (not supported)
42//!
43//! # Example
44//!
45//! ```ignore
46//! use spikard_http::grpc::framing::parse_grpc_client_stream;
47//! use axum::body::Body;
48//! use bytes::Bytes;
49//! use futures_util::StreamExt;
50//!
51//! let body = Body::from("...");
52//! let max_size = 4 * 1024 * 1024; // 4MB
53//! let mut stream = parse_grpc_client_stream(body, max_size).await?;
54//!
55//! while let Some(result) = stream.next().await {
56//!     match result {
57//!         Ok(message) => println!("Message: {:?}", message),
58//!         Err(status) => eprintln!("Error: {}", status),
59//!     }
60//! }
61//! ```
62
63use bytes::{Buf, Bytes, BytesMut};
64use futures_util::stream;
65use tonic::Status;
66
67use super::streaming::MessageStream;
68
69/// Parses an HTTP/2 gRPC request body as a stream of messages
70///
71/// Reads the gRPC frame format from the body stream, validating each frame
72/// and yielding individual message bytes.
73///
74/// # Arguments
75///
76/// * `body` - The HTTP/2 request body stream
77/// * `max_message_size` - Maximum allowed message size in bytes (validated per message)
78///
79/// # Returns
80///
81/// A `MessageStream` yielding:
82/// - `Ok(Bytes)`: A complete parsed message
83/// - `Err(Status)`: A gRPC protocol error
84///
85/// # Errors
86///
87/// Returns gRPC errors for:
88/// - Incomplete frame (EOF before 5-byte header): `INTERNAL`
89/// - Incomplete message (EOF before all message bytes): `INTERNAL`
90/// - Message size > max_message_size: `RESOURCE_EXHAUSTED`
91/// - Compression flag != 0: `UNIMPLEMENTED`
92/// - Read errors from the body stream: `INTERNAL`
93///
94/// # Example
95///
96/// ```ignore
97/// let body = Body::from(vec![
98///     0x00,                      // compression: no
99///     0x00, 0x00, 0x00, 0x05,   // length: 5 bytes
100///     b'h', b'e', b'l', b'l', b'o',  // message
101/// ]);
102///
103/// let stream = parse_grpc_client_stream(body, 1024).await?;
104/// ```
105pub async fn parse_grpc_client_stream(
106    body: axum::body::Body,
107    max_message_size: usize,
108) -> Result<MessageStream, Status> {
109    // Convert body into bytes
110    let body_bytes = axum::body::to_bytes(body, usize::MAX)
111        .await
112        .map_err(|e| Status::internal(format!("Failed to read body: {}", e)))?;
113
114    // Create a buffered reader
115    let buffer = BytesMut::from(&body_bytes[..]);
116
117    // Parse frames from the buffer
118    let messages = parse_all_frames(buffer, max_message_size)?;
119
120    // Convert to a MessageStream
121    Ok(Box::pin(stream::iter(messages.into_iter().map(Ok))))
122}
123
124/// Internal: Parse all frames from a buffer
125fn parse_all_frames(mut buffer: BytesMut, max_message_size: usize) -> Result<Vec<Bytes>, Status> {
126    let mut messages = Vec::new();
127
128    while !buffer.is_empty() {
129        // Check if we have enough bytes for the frame header
130        if buffer.len() < 5 {
131            return Err(Status::internal(
132                "Incomplete gRPC frame header: expected 5 bytes, got less",
133            ));
134        }
135
136        // Read the compression flag (1 byte)
137        let compression_flag = buffer[0];
138        if compression_flag != 0 {
139            return Err(Status::unimplemented("Message compression not supported"));
140        }
141
142        // Read the message length (4 bytes, big-endian)
143        let length_bytes = &buffer[1..5];
144        let message_length =
145            u32::from_be_bytes([length_bytes[0], length_bytes[1], length_bytes[2], length_bytes[3]]) as usize;
146
147        // Validate message length against max size
148        if message_length > max_message_size {
149            return Err(Status::resource_exhausted(format!(
150                "Message size {} exceeds maximum allowed size of {}",
151                message_length, max_message_size
152            )));
153        }
154
155        // Check if we have the complete message
156        let total_frame_size = 5 + message_length;
157        if buffer.len() < total_frame_size {
158            return Err(Status::internal(
159                "Incomplete gRPC message: expected more bytes than available",
160            ));
161        }
162
163        // Extract the message bytes
164        let message = buffer[5..total_frame_size].to_vec();
165        messages.push(Bytes::from(message));
166
167        // Advance the buffer
168        buffer.advance(total_frame_size);
169    }
170
171    Ok(messages)
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use futures_util::StreamExt;
178
179    #[tokio::test]
180    async fn test_single_frame_parsing() {
181        // Frame: compression=0, length=5, message="hello"
182        let frame = vec![
183            0x00, // compression: no
184            0x00, 0x00, 0x00, 0x05, // length: 5 bytes (big-endian)
185            b'h', b'e', b'l', b'l', b'o', // message
186        ];
187
188        let body = axum::body::Body::from(frame);
189        let mut stream = parse_grpc_client_stream(body, 1024).await.unwrap();
190        let msg = stream.next().await;
191
192        assert!(msg.is_some());
193        assert!(msg.unwrap().is_ok());
194        let result = stream.next().await;
195        assert!(result.is_none());
196    }
197
198    #[tokio::test]
199    async fn test_multiple_frames() {
200        // Two frames back-to-back
201        let mut frame = Vec::new();
202
203        // Frame 1: "hello"
204        frame.push(0x00);
205        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x05]);
206        frame.extend_from_slice(b"hello");
207
208        // Frame 2: "world"
209        frame.push(0x00);
210        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x05]);
211        frame.extend_from_slice(b"world");
212
213        let body = axum::body::Body::from(frame);
214        let mut stream = parse_grpc_client_stream(body, 1024).await.unwrap();
215
216        let msg1 = stream.next().await;
217        assert!(msg1.is_some());
218        assert_eq!(msg1.unwrap().unwrap(), b"hello"[..]);
219
220        let msg2 = stream.next().await;
221        assert!(msg2.is_some());
222        assert_eq!(msg2.unwrap().unwrap(), b"world"[..]);
223
224        let msg3 = stream.next().await;
225        assert!(msg3.is_none());
226    }
227
228    #[tokio::test]
229    async fn test_empty_body() {
230        let body = axum::body::Body::from(Vec::<u8>::new());
231        let mut stream = parse_grpc_client_stream(body, 1024).await.unwrap();
232
233        let result = stream.next().await;
234        assert!(result.is_none());
235    }
236
237    #[tokio::test]
238    async fn test_frame_size_at_limit() {
239        let max_size = 10;
240        let message = b"0123456789"; // exactly 10 bytes
241
242        let mut frame = Vec::new();
243        frame.push(0x00);
244        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x0a]); // length: 10
245        frame.extend_from_slice(message);
246
247        let body = axum::body::Body::from(frame);
248        let mut stream = parse_grpc_client_stream(body, max_size).await.unwrap();
249
250        let msg = stream.next().await;
251        assert!(msg.is_some());
252        assert_eq!(msg.unwrap().unwrap(), message[..]);
253    }
254
255    #[tokio::test]
256    async fn test_frame_exceeds_limit() {
257        let max_size = 5;
258        let message = b"toolong"; // 7 bytes, exceeds 5-byte limit
259
260        let mut frame = Vec::new();
261        frame.push(0x00);
262        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x07]); // length: 7
263        frame.extend_from_slice(message);
264
265        let body = axum::body::Body::from(frame);
266        let result = parse_grpc_client_stream(body, max_size).await;
267
268        assert!(result.is_err());
269        if let Err(status) = result {
270            assert_eq!(status.code(), tonic::Code::ResourceExhausted);
271        }
272    }
273
274    #[tokio::test]
275    async fn test_incomplete_frame_header() {
276        // Only 3 bytes of 5-byte header
277        let frame = vec![0x00, 0x00, 0x00];
278
279        let body = axum::body::Body::from(frame);
280        let result = parse_grpc_client_stream(body, 1024).await;
281
282        assert!(result.is_err());
283        if let Err(status) = result {
284            assert_eq!(status.code(), tonic::Code::Internal);
285        }
286    }
287
288    #[tokio::test]
289    async fn test_incomplete_frame_body() {
290        // Header says 10 bytes but only provide 5
291        let mut frame = Vec::new();
292        frame.push(0x00);
293        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x0a]); // length: 10
294        frame.extend_from_slice(b"short"); // only 5 bytes
295
296        let body = axum::body::Body::from(frame);
297        let result = parse_grpc_client_stream(body, 1024).await;
298
299        assert!(result.is_err());
300        if let Err(status) = result {
301            assert_eq!(status.code(), tonic::Code::Internal);
302        }
303    }
304
305    #[tokio::test]
306    async fn test_compression_flag_set() {
307        // Frame with compression flag = 1 (not supported)
308        let mut frame = Vec::new();
309        frame.push(0x01); // compression: yes (unsupported)
310        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x05]);
311        frame.extend_from_slice(b"hello");
312
313        let body = axum::body::Body::from(frame);
314        let result = parse_grpc_client_stream(body, 1024).await;
315
316        assert!(result.is_err());
317        if let Err(status) = result {
318            assert_eq!(status.code(), tonic::Code::Unimplemented);
319        }
320    }
321
322    #[tokio::test]
323    async fn test_large_message_length() {
324        // Test with large length value (but within max_message_size for this test)
325        let message = b"x".repeat(1000);
326        let mut frame = Vec::new();
327        frame.push(0x00);
328        frame.extend_from_slice(&[0x00, 0x00, 0x03, 0xe8]); // length: 1000 (big-endian)
329        frame.extend_from_slice(&message);
330
331        let body = axum::body::Body::from(frame);
332        let mut stream = parse_grpc_client_stream(body, 2000).await.unwrap();
333
334        let msg = stream.next().await;
335        assert!(msg.is_some());
336        assert_eq!(msg.unwrap().unwrap().len(), 1000);
337    }
338
339    #[tokio::test]
340    async fn test_zero_length_message() {
341        // Frame with 0-byte message (valid in gRPC)
342        let mut frame = Vec::new();
343        frame.push(0x00);
344        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // length: 0
345
346        let body = axum::body::Body::from(frame);
347        let mut stream = parse_grpc_client_stream(body, 1024).await.unwrap();
348
349        let msg = stream.next().await;
350        assert!(msg.is_some());
351        assert_eq!(msg.unwrap().unwrap().len(), 0);
352    }
353
354    #[tokio::test]
355    async fn test_multiple_frames_with_mixed_sizes() {
356        let mut frame = Vec::new();
357
358        // Frame 1: "abc" (3 bytes)
359        frame.push(0x00);
360        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x03]);
361        frame.extend_from_slice(b"abc");
362
363        // Frame 2: "defghij" (7 bytes)
364        frame.push(0x00);
365        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x07]);
366        frame.extend_from_slice(b"defghij");
367
368        // Frame 3: "" (0 bytes)
369        frame.push(0x00);
370        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]);
371
372        // Frame 4: "x" (1 byte)
373        frame.push(0x00);
374        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
375        frame.extend_from_slice(b"x");
376
377        let body = axum::body::Body::from(frame);
378        let mut stream = parse_grpc_client_stream(body, 1024).await.unwrap();
379
380        let msg1 = stream.next().await.unwrap().unwrap();
381        assert_eq!(msg1, b"abc"[..]);
382
383        let msg2 = stream.next().await.unwrap().unwrap();
384        assert_eq!(msg2, b"defghij"[..]);
385
386        let msg3 = stream.next().await.unwrap().unwrap();
387        assert_eq!(msg3.len(), 0);
388
389        let msg4 = stream.next().await.unwrap().unwrap();
390        assert_eq!(msg4, b"x"[..]);
391
392        let msg5 = stream.next().await;
393        assert!(msg5.is_none());
394    }
395
396    #[test]
397    fn test_big_endian_length_parsing() {
398        // Test that length is correctly parsed as big-endian
399        // Big-endian u32(256) = bytes [0x00, 0x00, 0x01, 0x00]
400        let buffer = BytesMut::from(
401            &[
402                0x00, // compression flag
403                0x00, 0x00, 0x01, 0x00, // length: 256 in big-endian
404            ][..],
405        );
406
407        // Extract the 4-byte length manually to verify
408        let length_bytes = &buffer[1..5];
409        let length = u32::from_be_bytes([length_bytes[0], length_bytes[1], length_bytes[2], length_bytes[3]]);
410
411        assert_eq!(length, 256);
412    }
413
414    #[test]
415    fn test_big_endian_max_value() {
416        // Test maximum u32 value in big-endian
417        let buffer = BytesMut::from(
418            &[
419                0x00, 0xff, 0xff, 0xff, 0xff, // max u32
420            ][..],
421        );
422
423        let length_bytes = &buffer[1..5];
424        let length = u32::from_be_bytes([length_bytes[0], length_bytes[1], length_bytes[2], length_bytes[3]]);
425
426        assert_eq!(length, u32::MAX);
427    }
428
429    #[tokio::test]
430    async fn test_error_message_includes_size_info() {
431        let max_size = 100;
432        let message = b"x".repeat(150);
433
434        let mut frame = Vec::new();
435        frame.push(0x00);
436        frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x96]); // length: 150
437        frame.extend_from_slice(&message);
438
439        let body = axum::body::Body::from(frame);
440        let result = parse_grpc_client_stream(body, max_size).await;
441
442        assert!(result.is_err());
443        if let Err(status) = result {
444            assert!(status.message().contains("150"));
445            assert!(status.message().contains("100"));
446        }
447    }
448
449    #[tokio::test]
450    async fn test_stream_collects_all_messages() {
451        // Ensure that the returned stream properly yields all messages
452        let mut frame = Vec::new();
453
454        for i in 0..10 {
455            frame.push(0x00);
456            frame.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
457            frame.push(b'0' + i as u8);
458        }
459
460        let body = axum::body::Body::from(frame);
461        let stream = parse_grpc_client_stream(body, 1024).await.unwrap();
462        let messages: Vec<_> = futures_util::StreamExt::collect(stream).await;
463
464        assert_eq!(messages.len(), 10);
465        for (i, msg) in messages.iter().enumerate() {
466            assert_eq!(msg.as_ref().unwrap()[0], b'0' + i as u8);
467        }
468    }
469}