Skip to main content

MessageStream

Type Alias MessageStream 

Source
pub type MessageStream = Pin<Box<dyn Stream<Item = Result<Bytes, Status>> + Send>>;
Expand description

Type alias for a stream of protobuf message bytes

Used for both client streaming (incoming) and server streaming (outgoing). Each item in the stream is either:

  • Ok(Bytes): A serialized protobuf message
  • Err(Status): A gRPC error

§Backpressure Considerations

Streaming responses should implement backpressure handling to avoid memory buildup with slow clients:

  • Problem: If a client reads slowly but the handler produces messages quickly, messages will queue in memory, potentially causing high memory usage or OOM errors.
  • Solution: The gRPC layer (Tonic) handles backpressure automatically via the underlying TCP/HTTP/2 connection. However, handlers should be aware of this behavior.
  • Best Practice: For long-running or high-volume streams, implement rate limiting or flow control in the handler to avoid overwhelming the network buffer.

§Example: Rate-limited streaming

use spikard_http::grpc::streaming::MessageStream;
use bytes::Bytes;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::sleep;
use futures_util::stream::{self, StreamExt};

// Handler that sends 1000 messages with rate limiting
fn create_rate_limited_stream() -> MessageStream {
    let messages = (0..1000).map(|i| {
        Ok(Bytes::from(format!("message_{}", i)))
    });

    // Stream with delay between messages to avoid overwhelming the client
    let stream = stream::iter(messages)
        .then(|msg| async {
            sleep(Duration::from_millis(1)).await;  // 1ms between messages
            msg
        });

    Box::pin(stream)
}

§Memory Management

Keep the following in mind when implementing large streams:

  • Messages are buffered in the gRPC transport layer’s internal queue
  • Slow clients will cause the queue to grow, increasing memory usage
  • Very large individual messages may cause buffer allocation spikes
  • Consider implementing stream chunking for very large responses (split one large message into many small ones)

Aliased Type§

pub struct MessageStream { /* private fields */ }