Skip to main content

jmap_client/event_source/
parser.rs

1/*
2 * Copyright Stalwart Labs LLC See the COPYING
3 * file at the top-level directory of this distribution.
4 *
5 * Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 * https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 * <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
8 * option. This file may not be copied, modified, or distributed
9 * except according to those terms.
10 */
11
12use super::Changes;
13use crate::{
14    event_source::{CalendarAlert, PushNotification},
15    PushObject,
16};
17
18const MAX_EVENT_SIZE: usize = 1024 * 1024;
19
20#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
21pub enum EventType {
22    Ping,
23    #[default]
24    State,
25    CalendarAlert,
26}
27
28#[derive(Default, Debug)]
29pub struct Event {
30    pub event: EventType,
31    pub id: Vec<u8>,
32    pub data: Vec<u8>,
33}
34
35#[derive(Debug, Copy, Clone, Default)]
36enum EventParserState {
37    #[default]
38    Init,
39    Comment,
40    Field,
41    Value,
42}
43
44#[derive(Default, Debug)]
45pub struct EventParser {
46    state: EventParserState,
47    field: Vec<u8>,
48    value: Vec<u8>,
49    bytes: Option<Vec<u8>>,
50    pos: usize,
51    result: Event,
52}
53
54impl EventParser {
55    pub fn push_bytes(&mut self, bytes: Vec<u8>) {
56        self.bytes = Some(bytes);
57    }
58
59    pub fn needs_bytes(&self) -> bool {
60        self.bytes.is_none()
61    }
62
63    pub fn filter_notification(&mut self) -> Option<crate::Result<PushNotification>> {
64        #[allow(clippy::never_loop)]
65        #[allow(clippy::while_let_on_iterator)]
66        while let Some(event) = self.next() {
67            match event {
68                Ok(Event {
69                    event: EventType::State,
70                    data,
71                    id,
72                    ..
73                }) => {
74                    return match serde_json::from_slice::<PushObject>(&data) {
75                        Ok(PushObject::StateChange { changed }) => {
76                            Some(Ok(PushNotification::StateChange(Changes {
77                                id: if !id.is_empty() {
78                                    Some(String::from_utf8(id).unwrap_or_default())
79                                } else {
80                                    None
81                                },
82                                changes: changed,
83                            })))
84                        }
85                        Ok(unexpected) => Some(Err(crate::Error::Internal(format!(
86                            "Unexpected PushObject variant: {:?}",
87                            unexpected
88                        )))),
89                        Err(err) => Some(Err(err.into())),
90                    };
91                }
92                Ok(Event {
93                    event: EventType::CalendarAlert,
94                    data,
95                    ..
96                }) => {
97                    return match serde_json::from_slice::<CalendarAlert>(&data) {
98                        Ok(calendar_alert) => {
99                            Some(Ok(PushNotification::CalendarAlert(calendar_alert)))
100                        }
101                        Err(err) => Some(Err(err.into())),
102                    };
103                }
104                Ok(Event {
105                    event: EventType::Ping,
106                    #[cfg(feature = "debug")]
107                    id,
108                    ..
109                }) => {
110                    #[cfg(feature = "debug")]
111                    return Some(Ok(PushNotification::StateChange(Changes {
112                        id: if !id.is_empty() {
113                            Some(String::from_utf8(id).unwrap_or_default())
114                        } else {
115                            None
116                        },
117                        changes: ahash::AHashMap::from_iter([(
118                            "ping".to_string(),
119                            ahash::AHashMap::new(),
120                        )]),
121                    })));
122                }
123                Err(err) => return Some(Err(err)),
124            }
125        }
126        None
127    }
128}
129
130impl Iterator for EventParser {
131    type Item = crate::Result<Event>;
132
133    fn next(&mut self) -> Option<Self::Item> {
134        let bytes = self.bytes.as_ref()?;
135
136        for byte in bytes.get(self.pos..)? {
137            self.pos += 1;
138
139            match self.state {
140                EventParserState::Init => match byte {
141                    b':' => {
142                        self.state = EventParserState::Comment;
143                    }
144                    b'\r' | b' ' => (),
145                    b'\n' => {
146                        return Some(Ok(std::mem::take(&mut self.result)));
147                    }
148                    _ => {
149                        self.state = EventParserState::Field;
150                        self.field.push(*byte);
151                    }
152                },
153                EventParserState::Comment => {
154                    if *byte == b'\n' {
155                        self.state = EventParserState::Init;
156                    }
157                }
158                EventParserState::Field => match byte {
159                    b'\r' => (),
160                    b'\n' => {
161                        self.state = EventParserState::Init;
162                        self.field.clear();
163                    }
164                    b':' => {
165                        self.state = EventParserState::Value;
166                    }
167                    _ => {
168                        if self.field.len() >= MAX_EVENT_SIZE {
169                            return Some(Err(crate::Error::Internal(
170                                "EventSource response is too long.".to_string(),
171                            )));
172                        }
173
174                        self.field.push(*byte);
175                    }
176                },
177                EventParserState::Value => match byte {
178                    b'\r' => (),
179                    b' ' if self.value.is_empty() => (),
180                    b'\n' => {
181                        self.state = EventParserState::Init;
182                        match &self.field[..] {
183                            b"id" => {
184                                self.result.id.extend_from_slice(&self.value);
185                            }
186                            b"data" => {
187                                self.result.data.extend_from_slice(&self.value);
188                            }
189                            b"event" => match &self.value[..] {
190                                b"calendarAlert" => {
191                                    self.result.event = EventType::CalendarAlert;
192                                }
193                                b"ping" => {
194                                    self.result.event = EventType::Ping;
195                                }
196                                _ => {
197                                    self.result.event = EventType::State;
198                                }
199                            },
200                            _ => {
201                                //ignore
202                            }
203                        }
204                        self.field.clear();
205                        self.value.clear();
206                    }
207                    _ => {
208                        if (self.field.len() + self.value.len()) >= MAX_EVENT_SIZE {
209                            return Some(Err(crate::Error::Internal(
210                                "EventSource response is too long.".to_string(),
211                            )));
212                        }
213
214                        self.value.push(*byte);
215                    }
216                },
217            }
218        }
219
220        self.bytes = None;
221        self.pos = 0;
222
223        None
224    }
225}
226
227#[cfg(test)]
228mod tests {
229
230    use super::{Event, EventType};
231
232    #[derive(Debug, PartialEq, Eq)]
233    struct EventString {
234        event: EventType,
235        id: String,
236        data: String,
237    }
238
239    impl From<Event> for EventString {
240        fn from(event: Event) -> Self {
241            Self {
242                event: event.event,
243                id: String::from_utf8(event.id).unwrap(),
244                data: String::from_utf8(event.data).unwrap(),
245            }
246        }
247    }
248
249    #[test]
250    fn parse() {
251        let mut parser = super::EventParser::default();
252        let mut results = Vec::new();
253
254        for frame in [
255            Vec::from("event: state\nid:  0\ndata: test\n\n"),
256            Vec::from("event: ping\nid:123\ndata: ping pa"),
257            Vec::from("yload"),
258            Vec::from("\n\n"),
259            Vec::from(":comment\n\n"),
260            Vec::from("data: YHOO\n"),
261            Vec::from("data: +2\n"),
262            Vec::from("data: 10\n\n"),
263            Vec::from(": test stream\n"),
264            Vec::from("data: first event\n"),
265            Vec::from("id: 1\n\n"),
266            Vec::from("data:second event\n"),
267            Vec::from("id\n\n"),
268            Vec::from("data:  third event\n\n"),
269            Vec::from("data:hello\n\ndata: world\n\n"),
270        ] {
271            parser.push_bytes(frame);
272
273            #[allow(clippy::while_let_on_iterator)]
274            while let Some(event) = parser.next() {
275                results.push(EventString::from(event.unwrap()));
276            }
277        }
278
279        assert_eq!(
280            results,
281            vec![
282                EventString {
283                    event: EventType::State,
284                    id: "0".to_string(),
285                    data: "test".to_string()
286                },
287                EventString {
288                    event: EventType::Ping,
289                    id: "123".to_string(),
290                    data: "ping payload".to_string()
291                },
292                EventString {
293                    event: EventType::State,
294                    id: "".to_string(),
295                    data: "".to_string()
296                },
297                EventString {
298                    event: EventType::State,
299                    id: "".to_string(),
300                    data: "YHOO+210".to_string()
301                },
302                EventString {
303                    event: EventType::State,
304                    id: "1".to_string(),
305                    data: "first event".to_string()
306                },
307                EventString {
308                    event: EventType::State,
309                    id: "".to_string(),
310                    data: "second event".to_string()
311                },
312                EventString {
313                    event: EventType::State,
314                    id: "".to_string(),
315                    data: "third event".to_string()
316                },
317                EventString {
318                    event: EventType::State,
319                    id: "".to_string(),
320                    data: "hello".to_string()
321                },
322                EventString {
323                    event: EventType::State,
324                    id: "".to_string(),
325                    data: "world".to_string()
326                }
327            ]
328        );
329    }
330}