pub type MessageStream = Pin<Box<dyn Stream<Item = Result<Bytes, Status>> + Send>>;Expand description
Type alias for a stream of protobuf message bytes
Used for both client streaming (incoming) and server streaming (outgoing). Each item in the stream is either:
- Ok(Bytes): A serialized protobuf message
- Err(Status): A gRPC error
§Backpressure Considerations
Streaming responses should implement backpressure handling to avoid memory buildup with slow clients:
- Problem: If a client reads slowly but the handler produces messages quickly, messages will queue in memory, potentially causing high memory usage or OOM errors.
- Solution: The gRPC layer (Tonic) handles backpressure automatically via the underlying TCP/HTTP/2 connection. However, handlers should be aware of this behavior.
- Best Practice: For long-running or high-volume streams, implement rate limiting or flow control in the handler to avoid overwhelming the network buffer.
§Example: Rate-limited streaming
ⓘ
use spikard_http::grpc::streaming::MessageStream;
use bytes::Bytes;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::sleep;
use futures_util::stream::{self, StreamExt};
// Handler that sends 1000 messages with rate limiting
fn create_rate_limited_stream() -> MessageStream {
let messages = (0..1000).map(|i| {
Ok(Bytes::from(format!("message_{}", i)))
});
// Stream with delay between messages to avoid overwhelming the client
let stream = stream::iter(messages)
.then(|msg| async {
sleep(Duration::from_millis(1)).await; // 1ms between messages
msg
});
Box::pin(stream)
}§Memory Management
Keep the following in mind when implementing large streams:
- Messages are buffered in the gRPC transport layer’s internal queue
- Slow clients will cause the queue to grow, increasing memory usage
- Very large individual messages may cause buffer allocation spikes
- Consider implementing stream chunking for very large responses (split one large message into many small ones)
Aliased Type§
pub struct MessageStream { /* private fields */ }