jmap_client/event_source/
parser.rs1use super::Changes;
13use crate::{
14 event_source::{CalendarAlert, PushNotification},
15 PushObject,
16};
17
18const MAX_EVENT_SIZE: usize = 1024 * 1024;
19
20#[derive(Debug, Copy, Clone, PartialEq, Eq)]
21pub enum EventType {
22 Ping,
23 State,
24 CalendarAlert,
25}
26
27impl Default for EventType {
28 fn default() -> Self {
29 Self::State
30 }
31}
32
33#[derive(Default, Debug)]
34pub struct Event {
35 pub event: EventType,
36 pub id: Vec<u8>,
37 pub data: Vec<u8>,
38}
39
40#[derive(Debug, Copy, Clone)]
41enum EventParserState {
42 Init,
43 Comment,
44 Field,
45 Value,
46}
47
48impl Default for EventParserState {
49 fn default() -> Self {
50 Self::Init
51 }
52}
53
54#[derive(Default, Debug)]
55pub struct EventParser {
56 state: EventParserState,
57 field: Vec<u8>,
58 value: Vec<u8>,
59 bytes: Option<Vec<u8>>,
60 pos: usize,
61 result: Event,
62}
63
64impl EventParser {
65 pub fn push_bytes(&mut self, bytes: Vec<u8>) {
66 self.bytes = Some(bytes);
67 }
68
69 pub fn needs_bytes(&self) -> bool {
70 self.bytes.is_none()
71 }
72
73 pub fn filter_notification(&mut self) -> Option<crate::Result<PushNotification>> {
74 #[allow(clippy::never_loop)]
75 #[allow(clippy::while_let_on_iterator)]
76 while let Some(event) = self.next() {
77 match event {
78 Ok(Event {
79 event: EventType::State,
80 data,
81 id,
82 ..
83 }) => {
84 return match serde_json::from_slice::<PushObject>(&data) {
85 Ok(PushObject::StateChange { changed }) => {
86 Some(Ok(PushNotification::StateChange(Changes {
87 id: if !id.is_empty() {
88 Some(String::from_utf8(id).unwrap_or_default())
89 } else {
90 None
91 },
92 changes: changed,
93 })))
94 }
95 Ok(unexpected) => Some(Err(crate::Error::Internal(format!(
96 "Unexpected PushObject variant: {:?}",
97 unexpected
98 )))),
99 Err(err) => Some(Err(err.into())),
100 };
101 }
102 Ok(Event {
103 event: EventType::CalendarAlert,
104 data,
105 ..
106 }) => {
107 return match serde_json::from_slice::<CalendarAlert>(&data) {
108 Ok(calendar_alert) => {
109 Some(Ok(PushNotification::CalendarAlert(calendar_alert)))
110 }
111 Err(err) => Some(Err(err.into())),
112 };
113 }
114 Ok(Event {
115 event: EventType::Ping,
116 #[cfg(feature = "debug")]
117 id,
118 ..
119 }) => {
120 #[cfg(feature = "debug")]
121 return Some(Ok(PushNotification::StateChange(Changes {
122 id: if !id.is_empty() {
123 Some(String::from_utf8(id).unwrap_or_default())
124 } else {
125 None
126 },
127 changes: ahash::AHashMap::from_iter([(
128 "ping".to_string(),
129 ahash::AHashMap::new(),
130 )]),
131 })));
132 }
133 Err(err) => return Some(Err(err)),
134 }
135 }
136 None
137 }
138}
139
140impl Iterator for EventParser {
141 type Item = crate::Result<Event>;
142
143 fn next(&mut self) -> Option<Self::Item> {
144 let bytes = self.bytes.as_ref()?;
145
146 for byte in bytes.get(self.pos..)? {
147 self.pos += 1;
148
149 match self.state {
150 EventParserState::Init => match byte {
151 b':' => {
152 self.state = EventParserState::Comment;
153 }
154 b'\r' | b' ' => (),
155 b'\n' => {
156 return Some(Ok(std::mem::take(&mut self.result)));
157 }
158 _ => {
159 self.state = EventParserState::Field;
160 self.field.push(*byte);
161 }
162 },
163 EventParserState::Comment => {
164 if *byte == b'\n' {
165 self.state = EventParserState::Init;
166 }
167 }
168 EventParserState::Field => match byte {
169 b'\r' => (),
170 b'\n' => {
171 self.state = EventParserState::Init;
172 self.field.clear();
173 }
174 b':' => {
175 self.state = EventParserState::Value;
176 }
177 _ => {
178 if self.field.len() >= MAX_EVENT_SIZE {
179 return Some(Err(crate::Error::Internal(
180 "EventSource response is too long.".to_string(),
181 )));
182 }
183
184 self.field.push(*byte);
185 }
186 },
187 EventParserState::Value => match byte {
188 b'\r' => (),
189 b' ' if self.value.is_empty() => (),
190 b'\n' => {
191 self.state = EventParserState::Init;
192 match &self.field[..] {
193 b"id" => {
194 self.result.id.extend_from_slice(&self.value);
195 }
196 b"data" => {
197 self.result.data.extend_from_slice(&self.value);
198 }
199 b"event" => match &self.value[..] {
200 b"calendarAlert" => {
201 self.result.event = EventType::CalendarAlert;
202 }
203 b"ping" => {
204 self.result.event = EventType::Ping;
205 }
206 _ => {
207 self.result.event = EventType::State;
208 }
209 },
210 _ => {
211 }
213 }
214 self.field.clear();
215 self.value.clear();
216 }
217 _ => {
218 if (self.field.len() + self.value.len()) >= MAX_EVENT_SIZE {
219 return Some(Err(crate::Error::Internal(
220 "EventSource response is too long.".to_string(),
221 )));
222 }
223
224 self.value.push(*byte);
225 }
226 },
227 }
228 }
229
230 self.bytes = None;
231 self.pos = 0;
232
233 None
234 }
235}
236
237#[cfg(test)]
238mod tests {
239
240 use super::{Event, EventType};
241
242 #[derive(Debug, PartialEq, Eq)]
243 struct EventString {
244 event: EventType,
245 id: String,
246 data: String,
247 }
248
249 impl From<Event> for EventString {
250 fn from(event: Event) -> Self {
251 Self {
252 event: event.event,
253 id: String::from_utf8(event.id).unwrap(),
254 data: String::from_utf8(event.data).unwrap(),
255 }
256 }
257 }
258
259 #[test]
260 fn parse() {
261 let mut parser = super::EventParser::default();
262 let mut results = Vec::new();
263
264 for frame in [
265 Vec::from("event: state\nid: 0\ndata: test\n\n"),
266 Vec::from("event: ping\nid:123\ndata: ping pa"),
267 Vec::from("yload"),
268 Vec::from("\n\n"),
269 Vec::from(":comment\n\n"),
270 Vec::from("data: YHOO\n"),
271 Vec::from("data: +2\n"),
272 Vec::from("data: 10\n\n"),
273 Vec::from(": test stream\n"),
274 Vec::from("data: first event\n"),
275 Vec::from("id: 1\n\n"),
276 Vec::from("data:second event\n"),
277 Vec::from("id\n\n"),
278 Vec::from("data: third event\n\n"),
279 Vec::from("data:hello\n\ndata: world\n\n"),
280 ] {
281 parser.push_bytes(frame);
282
283 #[allow(clippy::while_let_on_iterator)]
284 while let Some(event) = parser.next() {
285 results.push(EventString::from(event.unwrap()));
286 }
287 }
288
289 assert_eq!(
290 results,
291 vec![
292 EventString {
293 event: EventType::State,
294 id: "0".to_string(),
295 data: "test".to_string()
296 },
297 EventString {
298 event: EventType::Ping,
299 id: "123".to_string(),
300 data: "ping payload".to_string()
301 },
302 EventString {
303 event: EventType::State,
304 id: "".to_string(),
305 data: "".to_string()
306 },
307 EventString {
308 event: EventType::State,
309 id: "".to_string(),
310 data: "YHOO+210".to_string()
311 },
312 EventString {
313 event: EventType::State,
314 id: "1".to_string(),
315 data: "first event".to_string()
316 },
317 EventString {
318 event: EventType::State,
319 id: "".to_string(),
320 data: "second event".to_string()
321 },
322 EventString {
323 event: EventType::State,
324 id: "".to_string(),
325 data: "third event".to_string()
326 },
327 EventString {
328 event: EventType::State,
329 id: "".to_string(),
330 data: "hello".to_string()
331 },
332 EventString {
333 event: EventType::State,
334 id: "".to_string(),
335 data: "world".to_string()
336 }
337 ]
338 );
339 }
340}