jmap_client/event_source/
parser.rs

1/*
2 * Copyright Stalwart Labs LLC See the COPYING
3 * file at the top-level directory of this distribution.
4 *
5 * Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 * https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 * <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
8 * option. This file may not be copied, modified, or distributed
9 * except according to those terms.
10 */
11
12use super::Changes;
13use crate::{
14    event_source::{CalendarAlert, PushNotification},
15    PushObject,
16};
17
18const MAX_EVENT_SIZE: usize = 1024 * 1024;
19
20#[derive(Debug, Copy, Clone, PartialEq, Eq)]
21pub enum EventType {
22    Ping,
23    State,
24    CalendarAlert,
25}
26
27impl Default for EventType {
28    fn default() -> Self {
29        Self::State
30    }
31}
32
33#[derive(Default, Debug)]
34pub struct Event {
35    pub event: EventType,
36    pub id: Vec<u8>,
37    pub data: Vec<u8>,
38}
39
40#[derive(Debug, Copy, Clone)]
41enum EventParserState {
42    Init,
43    Comment,
44    Field,
45    Value,
46}
47
48impl Default for EventParserState {
49    fn default() -> Self {
50        Self::Init
51    }
52}
53
54#[derive(Default, Debug)]
55pub struct EventParser {
56    state: EventParserState,
57    field: Vec<u8>,
58    value: Vec<u8>,
59    bytes: Option<Vec<u8>>,
60    pos: usize,
61    result: Event,
62}
63
64impl EventParser {
65    pub fn push_bytes(&mut self, bytes: Vec<u8>) {
66        self.bytes = Some(bytes);
67    }
68
69    pub fn needs_bytes(&self) -> bool {
70        self.bytes.is_none()
71    }
72
73    pub fn filter_notification(&mut self) -> Option<crate::Result<PushNotification>> {
74        #[allow(clippy::never_loop)]
75        #[allow(clippy::while_let_on_iterator)]
76        while let Some(event) = self.next() {
77            match event {
78                Ok(Event {
79                    event: EventType::State,
80                    data,
81                    id,
82                    ..
83                }) => {
84                    return match serde_json::from_slice::<PushObject>(&data) {
85                        Ok(PushObject::StateChange { changed }) => {
86                            Some(Ok(PushNotification::StateChange(Changes {
87                                id: if !id.is_empty() {
88                                    Some(String::from_utf8(id).unwrap_or_default())
89                                } else {
90                                    None
91                                },
92                                changes: changed,
93                            })))
94                        }
95                        Ok(unexpected) => Some(Err(crate::Error::Internal(format!(
96                            "Unexpected PushObject variant: {:?}",
97                            unexpected
98                        )))),
99                        Err(err) => Some(Err(err.into())),
100                    };
101                }
102                Ok(Event {
103                    event: EventType::CalendarAlert,
104                    data,
105                    ..
106                }) => {
107                    return match serde_json::from_slice::<CalendarAlert>(&data) {
108                        Ok(calendar_alert) => {
109                            Some(Ok(PushNotification::CalendarAlert(calendar_alert)))
110                        }
111                        Err(err) => Some(Err(err.into())),
112                    };
113                }
114                Ok(Event {
115                    event: EventType::Ping,
116                    #[cfg(feature = "debug")]
117                    id,
118                    ..
119                }) => {
120                    #[cfg(feature = "debug")]
121                    return Some(Ok(PushNotification::StateChange(Changes {
122                        id: if !id.is_empty() {
123                            Some(String::from_utf8(id).unwrap_or_default())
124                        } else {
125                            None
126                        },
127                        changes: ahash::AHashMap::from_iter([(
128                            "ping".to_string(),
129                            ahash::AHashMap::new(),
130                        )]),
131                    })));
132                }
133                Err(err) => return Some(Err(err)),
134            }
135        }
136        None
137    }
138}
139
140impl Iterator for EventParser {
141    type Item = crate::Result<Event>;
142
143    fn next(&mut self) -> Option<Self::Item> {
144        let bytes = self.bytes.as_ref()?;
145
146        for byte in bytes.get(self.pos..)? {
147            self.pos += 1;
148
149            match self.state {
150                EventParserState::Init => match byte {
151                    b':' => {
152                        self.state = EventParserState::Comment;
153                    }
154                    b'\r' | b' ' => (),
155                    b'\n' => {
156                        return Some(Ok(std::mem::take(&mut self.result)));
157                    }
158                    _ => {
159                        self.state = EventParserState::Field;
160                        self.field.push(*byte);
161                    }
162                },
163                EventParserState::Comment => {
164                    if *byte == b'\n' {
165                        self.state = EventParserState::Init;
166                    }
167                }
168                EventParserState::Field => match byte {
169                    b'\r' => (),
170                    b'\n' => {
171                        self.state = EventParserState::Init;
172                        self.field.clear();
173                    }
174                    b':' => {
175                        self.state = EventParserState::Value;
176                    }
177                    _ => {
178                        if self.field.len() >= MAX_EVENT_SIZE {
179                            return Some(Err(crate::Error::Internal(
180                                "EventSource response is too long.".to_string(),
181                            )));
182                        }
183
184                        self.field.push(*byte);
185                    }
186                },
187                EventParserState::Value => match byte {
188                    b'\r' => (),
189                    b' ' if self.value.is_empty() => (),
190                    b'\n' => {
191                        self.state = EventParserState::Init;
192                        match &self.field[..] {
193                            b"id" => {
194                                self.result.id.extend_from_slice(&self.value);
195                            }
196                            b"data" => {
197                                self.result.data.extend_from_slice(&self.value);
198                            }
199                            b"event" => match &self.value[..] {
200                                b"calendarAlert" => {
201                                    self.result.event = EventType::CalendarAlert;
202                                }
203                                b"ping" => {
204                                    self.result.event = EventType::Ping;
205                                }
206                                _ => {
207                                    self.result.event = EventType::State;
208                                }
209                            },
210                            _ => {
211                                //ignore
212                            }
213                        }
214                        self.field.clear();
215                        self.value.clear();
216                    }
217                    _ => {
218                        if (self.field.len() + self.value.len()) >= MAX_EVENT_SIZE {
219                            return Some(Err(crate::Error::Internal(
220                                "EventSource response is too long.".to_string(),
221                            )));
222                        }
223
224                        self.value.push(*byte);
225                    }
226                },
227            }
228        }
229
230        self.bytes = None;
231        self.pos = 0;
232
233        None
234    }
235}
236
237#[cfg(test)]
238mod tests {
239
240    use super::{Event, EventType};
241
242    #[derive(Debug, PartialEq, Eq)]
243    struct EventString {
244        event: EventType,
245        id: String,
246        data: String,
247    }
248
249    impl From<Event> for EventString {
250        fn from(event: Event) -> Self {
251            Self {
252                event: event.event,
253                id: String::from_utf8(event.id).unwrap(),
254                data: String::from_utf8(event.data).unwrap(),
255            }
256        }
257    }
258
259    #[test]
260    fn parse() {
261        let mut parser = super::EventParser::default();
262        let mut results = Vec::new();
263
264        for frame in [
265            Vec::from("event: state\nid:  0\ndata: test\n\n"),
266            Vec::from("event: ping\nid:123\ndata: ping pa"),
267            Vec::from("yload"),
268            Vec::from("\n\n"),
269            Vec::from(":comment\n\n"),
270            Vec::from("data: YHOO\n"),
271            Vec::from("data: +2\n"),
272            Vec::from("data: 10\n\n"),
273            Vec::from(": test stream\n"),
274            Vec::from("data: first event\n"),
275            Vec::from("id: 1\n\n"),
276            Vec::from("data:second event\n"),
277            Vec::from("id\n\n"),
278            Vec::from("data:  third event\n\n"),
279            Vec::from("data:hello\n\ndata: world\n\n"),
280        ] {
281            parser.push_bytes(frame);
282
283            #[allow(clippy::while_let_on_iterator)]
284            while let Some(event) = parser.next() {
285                results.push(EventString::from(event.unwrap()));
286            }
287        }
288
289        assert_eq!(
290            results,
291            vec![
292                EventString {
293                    event: EventType::State,
294                    id: "0".to_string(),
295                    data: "test".to_string()
296                },
297                EventString {
298                    event: EventType::Ping,
299                    id: "123".to_string(),
300                    data: "ping payload".to_string()
301                },
302                EventString {
303                    event: EventType::State,
304                    id: "".to_string(),
305                    data: "".to_string()
306                },
307                EventString {
308                    event: EventType::State,
309                    id: "".to_string(),
310                    data: "YHOO+210".to_string()
311                },
312                EventString {
313                    event: EventType::State,
314                    id: "1".to_string(),
315                    data: "first event".to_string()
316                },
317                EventString {
318                    event: EventType::State,
319                    id: "".to_string(),
320                    data: "second event".to_string()
321                },
322                EventString {
323                    event: EventType::State,
324                    id: "".to_string(),
325                    data: "third event".to_string()
326                },
327                EventString {
328                    event: EventType::State,
329                    id: "".to_string(),
330                    data: "hello".to_string()
331                },
332                EventString {
333                    event: EventType::State,
334                    id: "".to_string(),
335                    data: "world".to_string()
336                }
337            ]
338        );
339    }
340}