1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use memchr::{memchr, memchr2};
3use std::{error::Error as StdError, fmt, str};
4
5const CR: u8 = b'\r';
6const LF: u8 = b'\n';
7const COLON: u8 = b':';
8const NULL: char = '\u{0000}';
9
10#[derive(Clone, Copy, Debug)]
12pub enum Error {
13 Utf8(std::str::Utf8Error),
14}
15
16impl fmt::Display for Error {
17 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
18 match self {
19 Self::Utf8(err) => write!(f, "Invalid UTF8: {}", err),
20 }
21 }
22}
23
24impl StdError for Error {
25 fn source(&self) -> Option<&(dyn StdError + 'static)> {
26 match self {
27 Self::Utf8(ref err) => Some(err),
28 }
29 }
30}
31
32#[derive(Default)]
33struct EventBuilder {
34 event_type: Option<String>,
35 data: Option<String>,
36 last_event_id: Option<String>,
37}
38
39impl EventBuilder {
40 pub fn add_field(&mut self, name: &[u8], value_bs: &[u8]) -> Result<(), Error> {
41 let value = str::from_utf8(value_bs).map_err(Error::Utf8)?;
42
43 if name == &b"event"[..] {
44 self.event_type.replace(String::from(value));
46 } else if name == &b"data"[..] {
47 match &mut self.data {
55 Some(ref mut data) => {
56 data.reserve(value.len() + 1);
57 data.push('\n');
58 data.push_str(value);
59 }
60
61 None => {
62 self.data = Some(String::from(value));
63 }
64 }
65 } else if name == &b"id"[..] && !value.contains(NULL) {
66 self.last_event_id = Some(String::from(value));
69 } else if name == &b"retry"[..] && value.chars().all(|c| c.is_digit(10)) {
70 }
74
75 Ok(())
76 }
77
78 fn ready(&self) -> bool {
79 self.event_type.is_some() || self.data.is_some() || self.last_event_id.is_some()
80 }
81
82 fn build_and_clear(&mut self) -> Result<crate::Event, Error> {
83 Ok(crate::Event {
84 event: self.event_type.take().unwrap_or_else(String::new),
85 data: self.data.take().unwrap_or_else(String::new),
86 last_event_id: self.last_event_id.take(),
87 })
88 }
89}
90
91#[derive(Default)]
92pub struct Parser {
93 buf: BytesMut,
94 builder: EventBuilder,
95}
96
97impl Parser {
98 pub fn put(&mut self, bs: impl Buf) {
99 self.buf.put(bs)
100 }
101
102 pub fn next(&mut self) -> Option<Result<crate::Event, Error>> {
105 while let Some(line) = self.parse_line() {
108 if line.is_empty() && self.builder.ready() {
109 return Some(self.builder.build_and_clear());
110 }
111
112 match memchr(COLON, &line) {
114 Some(0) => {
116 continue;
117 }
118
119 Some(i) => {
120 let name = &line[0..i];
122
123 let value = if i + 1 < line.len() && line[i + 1] == b' ' {
126 &line[i + 2..]
127 } else {
128 &line[i + 1..]
129 };
130
131 if let Err(err) = self.builder.add_field(name, value) {
135 return Some(Err(err));
136 }
137 }
138
139 None => {
140 if let Err(err) = self.builder.add_field(&line[..], &[][..]) {
141 return Some(Err(err));
142 }
143 }
144 }
145 }
146
147 None
148 }
149
150 fn parse_line(&mut self) -> Option<Bytes> {
151 match memchr2(CR, LF, &self.buf) {
167 Some(i) => {
168 let line = self.buf.split_to(i);
169
170 if !self.buf.is_empty() {
171 if 2 < self.buf.len() && self.buf[0..2] == [CR, LF] {
172 self.buf.advance(2);
173 } else {
174 self.buf.advance(1);
175 }
176 }
177
178 Some(line.freeze())
179 }
180
181 None => None,
182 }
183 }
184
185 #[cfg(test)]
186 fn bytes(&self) -> &[u8] {
188 &self.buf
189 }
190}
191
192impl From<&[u8]> for Parser {
193 fn from(b: &[u8]) -> Self {
194 Self {
195 buf: BytesMut::from(b),
196 builder: EventBuilder::default(),
197 }
198 }
199}
200
201impl From<&str> for Parser {
202 fn from(s: &str) -> Self {
203 Self {
204 buf: BytesMut::from(s),
205 builder: EventBuilder::default(),
206 }
207 }
208}
209
210#[cfg(test)]
211mod tests {
212
213 use super::*;
214
215 #[test]
216 fn buf_cleared_line_ending_with_crlf() {
217 let mut p = Parser::from("\r\n");
218 p.next();
219 assert_eq!(p.bytes(), &[]);
220 }
221
222 #[test]
223 fn single_lf_should_be_empty_line() {
224 let mut p = Parser::from("\n");
225 assert_eq!(p.parse_line().expect("parsing line"), &b""[..]);
226 }
227
228 #[test]
229 fn buf_cleared_line_ending_with_cr() {
230 let mut p = Parser::from("\r");
231 p.next();
232 assert_eq!(p.bytes(), &[]);
233 }
234
235 #[test]
236 fn buf_cleared_line_ending_with_lf() {
237 let mut p = Parser::from("\n");
238 p.next();
239 assert_eq!(p.bytes(), &[]);
240 }
241
242 #[test]
243 fn lines_starting_with_colon_are_ignored() {
244 let mut p = Parser::from(":ok");
245 assert!(p.next().is_none());
246 }
247
248 #[test]
249 fn test_memchr_order() {
250 let bs = &b"abcd\r\n"[..];
251 assert_eq!(memchr2(CR, LF, bs), Some(4));
252 assert_eq!(memchr2(LF, CR, bs), Some(4));
253 }
254
255 #[test]
256 fn colon_as_last_char_in_row() {
257 let mut p = Parser::from("data:\n\n");
258 let ev = p.next().expect("Expected an event").expect("Should parse");
259 assert_eq!(ev.event, "");
260 assert_eq!(ev.data, "");
261 }
262
263 #[test]
264 fn parse_example_2_events() {
265 let mut p = Parser::from(
268 r#"
269data
270
271data
272data
273
274data:"#,
275 );
276
277 let ev = p.next().expect("Event").expect("Parsed");
285 assert_eq!(ev.event, "");
286 assert_eq!(ev.data, "");
287
288 let ev = p.next().expect("Event").expect("Parsed");
289 assert_eq!(ev.event, "");
290 assert_eq!(ev.data, "\n");
291
292 assert!(p.next().is_none());
293 }
294
295 #[test]
296 fn parse_two_identical_events() {
297 let mut p = Parser::from(
300 r#"
301data:test
302
303data: test
304
305"#,
306 );
307
308 let ev = p
309 .next()
310 .expect("Expected first event")
311 .expect("Should parse");
312
313 assert_eq!(ev.event, "");
314 assert_eq!(ev.data, "test");
315
316 let ev = p
317 .next()
318 .expect("Expected first event")
319 .expect("Should parse");
320
321 assert_eq!(ev.event, "");
322 assert_eq!(ev.data, "test");
323 }
324
325 #[test]
326 fn parse_biggest_example_from_spec_page() {
327 let mut p = Parser::from(
348 r#"
349: test stream
350
351data: first event
352id: 1
353
354data:second event
355id
356
357data: third event
358
359"#,
360 );
361
362 let ev = p.next().expect("Event").expect("Parses");
363 assert_eq!(ev.data, "first event");
364 assert_eq!(ev.last_event_id.as_deref(), Some("1"));
365
366 let ev = p.next().expect("Event").expect("Parses");
367 assert_eq!(ev.data, "second event");
368 assert_eq!(ev.last_event_id.as_deref(), Some(""));
369
370 let ev = p.next().expect("Event").expect("Parses");
371 assert_eq!(ev.data, " third event");
372 assert_eq!(ev.last_event_id, None);
373 }
374
375 #[test]
376 fn buf_fiddle() {
377 let mut buf = BytesMut::from("1234");
378
379 let left = buf.split_to(1);
380 assert_eq!(left, &b"1"[..]);
381 assert_eq!(buf, &b"234"[..]);
382 }
383}