jmap_client/event_source/
parser.rs1use super::Changes;
13use crate::{
14 event_source::{CalendarAlert, PushNotification},
15 PushObject,
16};
17
18const MAX_EVENT_SIZE: usize = 1024 * 1024;
19
20#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
21pub enum EventType {
22 Ping,
23 #[default]
24 State,
25 CalendarAlert,
26}
27
28#[derive(Default, Debug)]
29pub struct Event {
30 pub event: EventType,
31 pub id: Vec<u8>,
32 pub data: Vec<u8>,
33}
34
35#[derive(Debug, Copy, Clone, Default)]
36enum EventParserState {
37 #[default]
38 Init,
39 Comment,
40 Field,
41 Value,
42}
43
44#[derive(Default, Debug)]
45pub struct EventParser {
46 state: EventParserState,
47 field: Vec<u8>,
48 value: Vec<u8>,
49 bytes: Option<Vec<u8>>,
50 pos: usize,
51 result: Event,
52}
53
54impl EventParser {
55 pub fn push_bytes(&mut self, bytes: Vec<u8>) {
56 self.bytes = Some(bytes);
57 }
58
59 pub fn needs_bytes(&self) -> bool {
60 self.bytes.is_none()
61 }
62
63 pub fn filter_notification(&mut self) -> Option<crate::Result<PushNotification>> {
64 #[allow(clippy::never_loop)]
65 #[allow(clippy::while_let_on_iterator)]
66 while let Some(event) = self.next() {
67 match event {
68 Ok(Event {
69 event: EventType::State,
70 data,
71 id,
72 ..
73 }) => {
74 return match serde_json::from_slice::<PushObject>(&data) {
75 Ok(PushObject::StateChange { changed }) => {
76 Some(Ok(PushNotification::StateChange(Changes {
77 id: if !id.is_empty() {
78 Some(String::from_utf8(id).unwrap_or_default())
79 } else {
80 None
81 },
82 changes: changed,
83 })))
84 }
85 Ok(unexpected) => Some(Err(crate::Error::Internal(format!(
86 "Unexpected PushObject variant: {:?}",
87 unexpected
88 )))),
89 Err(err) => Some(Err(err.into())),
90 };
91 }
92 Ok(Event {
93 event: EventType::CalendarAlert,
94 data,
95 ..
96 }) => {
97 return match serde_json::from_slice::<CalendarAlert>(&data) {
98 Ok(calendar_alert) => {
99 Some(Ok(PushNotification::CalendarAlert(calendar_alert)))
100 }
101 Err(err) => Some(Err(err.into())),
102 };
103 }
104 Ok(Event {
105 event: EventType::Ping,
106 #[cfg(feature = "debug")]
107 id,
108 ..
109 }) => {
110 #[cfg(feature = "debug")]
111 return Some(Ok(PushNotification::StateChange(Changes {
112 id: if !id.is_empty() {
113 Some(String::from_utf8(id).unwrap_or_default())
114 } else {
115 None
116 },
117 changes: ahash::AHashMap::from_iter([(
118 "ping".to_string(),
119 ahash::AHashMap::new(),
120 )]),
121 })));
122 }
123 Err(err) => return Some(Err(err)),
124 }
125 }
126 None
127 }
128}
129
130impl Iterator for EventParser {
131 type Item = crate::Result<Event>;
132
133 fn next(&mut self) -> Option<Self::Item> {
134 let bytes = self.bytes.as_ref()?;
135
136 for byte in bytes.get(self.pos..)? {
137 self.pos += 1;
138
139 match self.state {
140 EventParserState::Init => match byte {
141 b':' => {
142 self.state = EventParserState::Comment;
143 }
144 b'\r' | b' ' => (),
145 b'\n' => {
146 return Some(Ok(std::mem::take(&mut self.result)));
147 }
148 _ => {
149 self.state = EventParserState::Field;
150 self.field.push(*byte);
151 }
152 },
153 EventParserState::Comment => {
154 if *byte == b'\n' {
155 self.state = EventParserState::Init;
156 }
157 }
158 EventParserState::Field => match byte {
159 b'\r' => (),
160 b'\n' => {
161 self.state = EventParserState::Init;
162 self.field.clear();
163 }
164 b':' => {
165 self.state = EventParserState::Value;
166 }
167 _ => {
168 if self.field.len() >= MAX_EVENT_SIZE {
169 return Some(Err(crate::Error::Internal(
170 "EventSource response is too long.".to_string(),
171 )));
172 }
173
174 self.field.push(*byte);
175 }
176 },
177 EventParserState::Value => match byte {
178 b'\r' => (),
179 b' ' if self.value.is_empty() => (),
180 b'\n' => {
181 self.state = EventParserState::Init;
182 match &self.field[..] {
183 b"id" => {
184 self.result.id.extend_from_slice(&self.value);
185 }
186 b"data" => {
187 self.result.data.extend_from_slice(&self.value);
188 }
189 b"event" => match &self.value[..] {
190 b"calendarAlert" => {
191 self.result.event = EventType::CalendarAlert;
192 }
193 b"ping" => {
194 self.result.event = EventType::Ping;
195 }
196 _ => {
197 self.result.event = EventType::State;
198 }
199 },
200 _ => {
201 }
203 }
204 self.field.clear();
205 self.value.clear();
206 }
207 _ => {
208 if (self.field.len() + self.value.len()) >= MAX_EVENT_SIZE {
209 return Some(Err(crate::Error::Internal(
210 "EventSource response is too long.".to_string(),
211 )));
212 }
213
214 self.value.push(*byte);
215 }
216 },
217 }
218 }
219
220 self.bytes = None;
221 self.pos = 0;
222
223 None
224 }
225}
226
227#[cfg(test)]
228mod tests {
229
230 use super::{Event, EventType};
231
232 #[derive(Debug, PartialEq, Eq)]
233 struct EventString {
234 event: EventType,
235 id: String,
236 data: String,
237 }
238
239 impl From<Event> for EventString {
240 fn from(event: Event) -> Self {
241 Self {
242 event: event.event,
243 id: String::from_utf8(event.id).unwrap(),
244 data: String::from_utf8(event.data).unwrap(),
245 }
246 }
247 }
248
249 #[test]
250 fn parse() {
251 let mut parser = super::EventParser::default();
252 let mut results = Vec::new();
253
254 for frame in [
255 Vec::from("event: state\nid: 0\ndata: test\n\n"),
256 Vec::from("event: ping\nid:123\ndata: ping pa"),
257 Vec::from("yload"),
258 Vec::from("\n\n"),
259 Vec::from(":comment\n\n"),
260 Vec::from("data: YHOO\n"),
261 Vec::from("data: +2\n"),
262 Vec::from("data: 10\n\n"),
263 Vec::from(": test stream\n"),
264 Vec::from("data: first event\n"),
265 Vec::from("id: 1\n\n"),
266 Vec::from("data:second event\n"),
267 Vec::from("id\n\n"),
268 Vec::from("data: third event\n\n"),
269 Vec::from("data:hello\n\ndata: world\n\n"),
270 ] {
271 parser.push_bytes(frame);
272
273 #[allow(clippy::while_let_on_iterator)]
274 while let Some(event) = parser.next() {
275 results.push(EventString::from(event.unwrap()));
276 }
277 }
278
279 assert_eq!(
280 results,
281 vec![
282 EventString {
283 event: EventType::State,
284 id: "0".to_string(),
285 data: "test".to_string()
286 },
287 EventString {
288 event: EventType::Ping,
289 id: "123".to_string(),
290 data: "ping payload".to_string()
291 },
292 EventString {
293 event: EventType::State,
294 id: "".to_string(),
295 data: "".to_string()
296 },
297 EventString {
298 event: EventType::State,
299 id: "".to_string(),
300 data: "YHOO+210".to_string()
301 },
302 EventString {
303 event: EventType::State,
304 id: "1".to_string(),
305 data: "first event".to_string()
306 },
307 EventString {
308 event: EventType::State,
309 id: "".to_string(),
310 data: "second event".to_string()
311 },
312 EventString {
313 event: EventType::State,
314 id: "".to_string(),
315 data: "third event".to_string()
316 },
317 EventString {
318 event: EventType::State,
319 id: "".to_string(),
320 data: "hello".to_string()
321 },
322 EventString {
323 event: EventType::State,
324 id: "".to_string(),
325 data: "world".to_string()
326 }
327 ]
328 );
329 }
330}