[][src]Function warp::filters::sse::reply

pub fn reply<S>(event_stream: S) -> impl Reply where
    S: TryStream + Send + 'static,
    S::Ok: ServerSentEvent,
    S::Error: StdError + Send + Sync + 'static, 

Server-sent events reply

This function converts stream of server events into a Reply with:

  • Status of 200 OK
  • Header content-type: text/event-stream
  • Header cache-control: no-cache.

Example


use std::time::Duration;
use futures::Stream;
use futures::stream::iter;
use std::convert::Infallible;
use warp::{Filter, sse::ServerSentEvent};
use serde_derive::Serialize;

#[derive(Serialize)]
struct Msg {
    from: u32,
    text: String,
}

fn event_stream() -> impl Stream<Item = Result<impl ServerSentEvent, Infallible>> {
        iter(vec![
            // Unnamed event with data only
            Ok(warp::sse::data("payload").boxed()),
            // Named event with ID and retry timeout
            Ok((
                warp::sse::data("other message\nwith next line"),
                warp::sse::event("chat"),
                warp::sse::id(1),
                warp::sse::retry(Duration::from_millis(15000))
            ).boxed()),
            // Event with JSON data
            Ok((
                warp::sse::id(2),
                warp::sse::json(Msg {
                    from: 2,
                    text: "hello".into(),
                }),
            ).boxed()),
        ])
}

async {
    let app = warp::path("sse").and(warp::get()).map(|| {
       warp::sse::reply(event_stream())
    });

    let res = warp::test::request()
        .method("GET")
        .header("Connection", "Keep-Alive")
        .path("/sse")
        .reply(&app)
        .await
        .into_body();

    assert_eq!(
        res,
        r#"data:payload

event:chat
data:other message
data:with next line
id:1
retry:15000

data:{"from":2,"text":"hello"}
id:2

"#
    );
};