jmap_client/event_source/
parser.rs

1/*
2 * Copyright Stalwart Labs LLC See the COPYING
3 * file at the top-level directory of this distribution.
4 *
5 * Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 * https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 * <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
8 * option. This file may not be copied, modified, or distributed
9 * except according to those terms.
10 */
11
12use crate::StateChange;
13
14use super::Changes;
15
16const MAX_EVENT_SIZE: usize = 1024 * 1024;
17
18#[derive(Debug, Copy, Clone, PartialEq, Eq)]
19pub enum EventType {
20    Ping,
21    State,
22}
23
24impl Default for EventType {
25    fn default() -> Self {
26        Self::State
27    }
28}
29
30#[derive(Default, Debug)]
31pub struct Event {
32    pub event: EventType,
33    pub id: Vec<u8>,
34    pub data: Vec<u8>,
35}
36
37#[derive(Debug, Copy, Clone)]
38enum EventParserState {
39    Init,
40    Comment,
41    Field,
42    Value,
43}
44
45impl Default for EventParserState {
46    fn default() -> Self {
47        Self::Init
48    }
49}
50
51#[derive(Default, Debug)]
52pub struct EventParser {
53    state: EventParserState,
54    field: Vec<u8>,
55    value: Vec<u8>,
56    bytes: Option<Vec<u8>>,
57    pos: usize,
58    result: Event,
59}
60
61impl EventParser {
62    pub fn push_bytes(&mut self, bytes: Vec<u8>) {
63        self.bytes = Some(bytes);
64    }
65
66    pub fn needs_bytes(&self) -> bool {
67        self.bytes.is_none()
68    }
69
70    pub fn filter_state(&mut self) -> Option<crate::Result<Changes>> {
71        #[allow(clippy::never_loop)]
72        #[allow(clippy::while_let_on_iterator)]
73        while let Some(event) = self.next() {
74            match event {
75                Ok(Event {
76                    event: EventType::State,
77                    data,
78                    id,
79                    ..
80                }) => {
81                    return match serde_json::from_slice::<StateChange>(&data) {
82                        Ok(state_change) => Some(Ok(Changes {
83                            id: if !id.is_empty() {
84                                Some(String::from_utf8(id).unwrap_or_default())
85                            } else {
86                                None
87                            },
88                            changes: state_change.changed,
89                        })),
90                        Err(err) => Some(Err(err.into())),
91                    };
92                }
93                Ok(Event {
94                    event: EventType::Ping,
95                    #[cfg(feature = "debug")]
96                    id,
97                    ..
98                }) => {
99                    #[cfg(feature = "debug")]
100                    return Some(Ok(Changes {
101                        id: if !id.is_empty() {
102                            Some(String::from_utf8(id).unwrap_or_default())
103                        } else {
104                            None
105                        },
106                        changes: ahash::AHashMap::from_iter([(
107                            "ping".to_string(),
108                            ahash::AHashMap::new(),
109                        )]),
110                    }));
111                }
112                Err(err) => return Some(Err(err)),
113            }
114        }
115        None
116    }
117}
118
119impl Iterator for EventParser {
120    type Item = crate::Result<Event>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        let bytes = self.bytes.as_ref()?;
124
125        for byte in bytes.get(self.pos..)? {
126            self.pos += 1;
127
128            match self.state {
129                EventParserState::Init => match byte {
130                    b':' => {
131                        self.state = EventParserState::Comment;
132                    }
133                    b'\r' | b' ' => (),
134                    b'\n' => {
135                        return Some(Ok(std::mem::take(&mut self.result)));
136                    }
137                    _ => {
138                        self.state = EventParserState::Field;
139                        self.field.push(*byte);
140                    }
141                },
142                EventParserState::Comment => {
143                    if *byte == b'\n' {
144                        self.state = EventParserState::Init;
145                    }
146                }
147                EventParserState::Field => match byte {
148                    b'\r' => (),
149                    b'\n' => {
150                        self.state = EventParserState::Init;
151                        self.field.clear();
152                    }
153                    b':' => {
154                        self.state = EventParserState::Value;
155                    }
156                    _ => {
157                        if self.field.len() >= MAX_EVENT_SIZE {
158                            return Some(Err(crate::Error::Internal(
159                                "EventSource response is too long.".to_string(),
160                            )));
161                        }
162
163                        self.field.push(*byte);
164                    }
165                },
166                EventParserState::Value => match byte {
167                    b'\r' => (),
168                    b' ' if self.value.is_empty() => (),
169                    b'\n' => {
170                        self.state = EventParserState::Init;
171                        match &self.field[..] {
172                            b"id" => {
173                                self.result.id.extend_from_slice(&self.value);
174                            }
175                            b"data" => {
176                                self.result.data.extend_from_slice(&self.value);
177                            }
178                            b"event" => {
179                                if self.value == b"ping" {
180                                    self.result.event = EventType::Ping;
181                                } else {
182                                    self.result.event = EventType::State;
183                                }
184                            }
185                            _ => {
186                                //ignore
187                            }
188                        }
189                        self.field.clear();
190                        self.value.clear();
191                    }
192                    _ => {
193                        if (self.field.len() + self.value.len()) >= MAX_EVENT_SIZE {
194                            return Some(Err(crate::Error::Internal(
195                                "EventSource response is too long.".to_string(),
196                            )));
197                        }
198
199                        self.value.push(*byte);
200                    }
201                },
202            }
203        }
204
205        self.bytes = None;
206        self.pos = 0;
207
208        None
209    }
210}
211
212#[cfg(test)]
213mod tests {
214
215    use super::{Event, EventType};
216
217    #[derive(Debug, PartialEq, Eq)]
218    struct EventString {
219        event: EventType,
220        id: String,
221        data: String,
222    }
223
224    impl From<Event> for EventString {
225        fn from(event: Event) -> Self {
226            Self {
227                event: event.event,
228                id: String::from_utf8(event.id).unwrap(),
229                data: String::from_utf8(event.data).unwrap(),
230            }
231        }
232    }
233
234    #[test]
235    fn parse() {
236        let mut parser = super::EventParser::default();
237        let mut results = Vec::new();
238
239        for frame in [
240            Vec::from("event: state\nid:  0\ndata: test\n\n"),
241            Vec::from("event: ping\nid:123\ndata: ping pa"),
242            Vec::from("yload"),
243            Vec::from("\n\n"),
244            Vec::from(":comment\n\n"),
245            Vec::from("data: YHOO\n"),
246            Vec::from("data: +2\n"),
247            Vec::from("data: 10\n\n"),
248            Vec::from(": test stream\n"),
249            Vec::from("data: first event\n"),
250            Vec::from("id: 1\n\n"),
251            Vec::from("data:second event\n"),
252            Vec::from("id\n\n"),
253            Vec::from("data:  third event\n\n"),
254            Vec::from("data:hello\n\ndata: world\n\n"),
255        ] {
256            parser.push_bytes(frame);
257
258            #[allow(clippy::while_let_on_iterator)]
259            while let Some(event) = parser.next() {
260                results.push(EventString::from(event.unwrap()));
261            }
262        }
263
264        assert_eq!(
265            results,
266            vec![
267                EventString {
268                    event: EventType::State,
269                    id: "0".to_string(),
270                    data: "test".to_string()
271                },
272                EventString {
273                    event: EventType::Ping,
274                    id: "123".to_string(),
275                    data: "ping payload".to_string()
276                },
277                EventString {
278                    event: EventType::State,
279                    id: "".to_string(),
280                    data: "".to_string()
281                },
282                EventString {
283                    event: EventType::State,
284                    id: "".to_string(),
285                    data: "YHOO+210".to_string()
286                },
287                EventString {
288                    event: EventType::State,
289                    id: "1".to_string(),
290                    data: "first event".to_string()
291                },
292                EventString {
293                    event: EventType::State,
294                    id: "".to_string(),
295                    data: "second event".to_string()
296                },
297                EventString {
298                    event: EventType::State,
299                    id: "".to_string(),
300                    data: "third event".to_string()
301                },
302                EventString {
303                    event: EventType::State,
304                    id: "".to_string(),
305                    data: "hello".to_string()
306                },
307                EventString {
308                    event: EventType::State,
309                    id: "".to_string(),
310                    data: "world".to_string()
311                }
312            ]
313        );
314    }
315}