use bytes::Bytes;
use futures_util::StreamExt;
use http_body::Frame;
use http_body_util::{Full, StreamBody};
use sse_stream::{Sse, SseStream};
struct ChainedFrameBody {
sent: bool,
first: &'static [u8],
second: &'static [u8],
}
impl http_body::Body for ChainedFrameBody {
type Data = bytes::buf::Chain<Bytes, Bytes>;
type Error = std::convert::Infallible;
fn poll_frame(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if self.sent {
return std::task::Poll::Ready(None);
}
self.sent = true;
let chained = bytes::Buf::chain(
Bytes::from_static(self.first),
Bytes::from_static(self.second),
);
std::task::Poll::Ready(Some(Ok(Frame::data(chained))))
}
}
#[tokio::test]
async fn test_multi_segment_buf_frame_not_truncated() {
let body = ChainedFrameBody {
sent: false,
first: b"data: hel",
second: b"lo\n\n",
};
let mut sse_body = SseStream::new(body);
let mut out = Vec::new();
while let Some(sse) = sse_body.next().await {
out.push(sse.expect("parse error"));
}
assert_eq!(
out,
vec![Sse {
event: None,
data: Some("hello".into()),
id: None,
retry: None,
}],
"multi-segment Buf frame must be fully consumed"
);
}
async fn collect_from_full(data: &[u8]) -> Vec<Sse> {
let body = Full::<Bytes>::from(data.to_vec());
let mut sse_body = SseStream::new(body);
let mut out = Vec::new();
while let Some(sse) = sse_body.next().await {
out.push(sse.expect("parse error"));
}
out
}
async fn collect_from_chunks(chunks: Vec<&'static [u8]>) -> Vec<Sse> {
let stream = futures_util::stream::iter(
chunks
.into_iter()
.map(|c| Ok::<_, std::convert::Infallible>(Frame::data(Bytes::from_static(c)))),
);
let body = StreamBody::new(stream);
let mut sse_body = SseStream::new(body);
let mut out = Vec::new();
while let Some(sse) = sse_body.next().await {
out.push(sse.expect("parse error"));
}
out
}
fn data_only(s: &str) -> Sse {
Sse {
event: None,
data: Some(s.to_owned()),
id: None,
retry: None,
}
}
#[tokio::test]
async fn test_bytes_parse() {
let bytes = include_bytes!("data/test_stream.sse");
let body = Full::<Bytes>::from(bytes.to_vec());
let mut sse_body = sse_stream::SseStream::new(body);
while let Some(sse) = sse_body.next().await {
println!("{:?}", sse.unwrap());
}
}
#[tokio::test]
async fn test_bom_header_at_start() {
let sse_data = b"\xEF\xBB\xBFdata: hello\n\n";
let body = Full::<Bytes>::from(sse_data.to_vec());
let mut sse_body = sse_stream::SseStream::new(body);
let sse = sse_body
.next()
.await
.expect("Should have one SSE event")
.unwrap();
assert_eq!(sse.data, Some("hello".to_string()));
}
#[tokio::test]
async fn test_line_break_crlf() {
let out = collect_from_full(b"data: a\r\ndata: b\r\n\r\n").await;
assert_eq!(out, vec![data_only("a\nb")]);
}
#[tokio::test]
async fn test_line_break_cr_only() {
let out = collect_from_full(b"data: a\rdata: b\r\r").await;
assert_eq!(out, vec![data_only("a\nb")]);
}
#[tokio::test]
async fn test_line_break_mixed() {
let payload: &[u8] = b"data: one\r\ndata: two\rdata: three\n\r\n";
let out = collect_from_full(payload).await;
assert_eq!(out, vec![data_only("one\ntwo\nthree")]);
}
#[tokio::test]
async fn test_cr_lf_split_across_chunks() {
let out = collect_from_chunks(vec![b"data: hello\r", b"\ndata: world\n\n"]).await;
assert_eq!(out, vec![data_only("hello\nworld")]);
}
#[tokio::test]
async fn test_cr_then_non_lf_across_chunks() {
let out = collect_from_chunks(vec![b"data: a\r", b"data: b\n\n"]).await;
assert_eq!(out, vec![data_only("a\nb")]);
}
#[tokio::test]
async fn test_cr_then_cr_across_chunks() {
let out = collect_from_chunks(vec![b"data: a\r", b"\rdata: b\n\n"]).await;
assert_eq!(out, vec![data_only("a"), data_only("b")]);
}
#[tokio::test]
async fn test_dispatch_boundary_split_at_cr() {
let out = collect_from_chunks(vec![b"data: x\r\n\r", b"\n"]).await;
assert_eq!(out, vec![data_only("x")]);
}
#[tokio::test]
async fn test_multiple_consecutive_cr() {
let out = collect_from_full(b"data: a\r\r\r").await;
assert_eq!(out, vec![data_only("a")]);
}
#[tokio::test]
async fn test_comment_lines() {
let out = collect_from_full(b": this is a comment\ndata: hi\n: another\n\n").await;
assert_eq!(out, vec![data_only("hi")]);
}
#[tokio::test]
async fn test_empty_data_field() {
let out = collect_from_full(b"data:\n\n").await;
assert_eq!(out, vec![data_only("")]);
}
#[tokio::test]
async fn test_two_empty_data_lines_join_with_newline() {
let out = collect_from_full(b"data:\ndata:\n\n").await;
assert_eq!(out, vec![data_only("\n")]);
}
#[tokio::test]
async fn test_only_one_leading_space_stripped() {
let out = collect_from_full(b"data: hello\n\n").await;
assert_eq!(out, vec![data_only(" hello")]);
}
#[tokio::test]
async fn test_id_with_null_byte_ignored() {
let payload: &[u8] = b"id: ab\x00cd\ndata: x\n\n";
let stream = futures_util::stream::iter(std::iter::once(Ok::<_, std::convert::Infallible>(
Frame::data(Bytes::from_static(payload)),
)));
let body = StreamBody::new(stream);
let mut sse_body = SseStream::new(body);
let mut out = Vec::new();
while let Some(sse) = sse_body.next().await {
out.push(sse.expect("parse error"));
}
assert_eq!(out, vec![data_only("x")], "id with NULL must be ignored");
}
#[tokio::test]
async fn test_incomplete_trailing_event_discarded() {
let out = collect_from_full(b"data: complete\n\ndata: incomplete\n").await;
assert_eq!(out, vec![data_only("complete")]);
}
#[tokio::test]
async fn test_multiple_events_split_chunks() {
let out = collect_from_chunks(vec![
b"event: a\ndata: 1\n",
b"\nevent: b\nda",
b"ta: 2\n\n",
])
.await;
assert_eq!(
out,
vec![
Sse {
event: Some("a".into()),
data: Some("1".into()),
..Default::default()
},
Sse {
event: Some("b".into()),
data: Some("2".into()),
..Default::default()
},
]
);
}
#[tokio::test]
async fn test_bom_split_across_chunks() {
let chunk1 = Bytes::from_static(b"\xEF");
let chunk2 = Bytes::from_static(b"\xBB\xBFdata: hello\n\n");
let body = {
let stream = futures_util::stream::iter(
[chunk1, chunk2]
.into_iter()
.map(|chunk| Ok::<_, std::convert::Infallible>(Frame::data(chunk))),
);
StreamBody::new(stream)
};
let mut sse_body = sse_stream::SseStream::new(body);
let sse = sse_body
.next()
.await
.expect("Should have one SSE event")
.unwrap();
assert_eq!(sse.data, Some("hello".to_string()));
}