jmap_client/event_source/
parser.rs1use crate::StateChange;
13
14use super::Changes;
15
16const MAX_EVENT_SIZE: usize = 1024 * 1024;
17
18#[derive(Debug, Copy, Clone, PartialEq, Eq)]
19pub enum EventType {
20 Ping,
21 State,
22}
23
24impl Default for EventType {
25 fn default() -> Self {
26 Self::State
27 }
28}
29
30#[derive(Default, Debug)]
31pub struct Event {
32 pub event: EventType,
33 pub id: Vec<u8>,
34 pub data: Vec<u8>,
35}
36
37#[derive(Debug, Copy, Clone)]
38enum EventParserState {
39 Init,
40 Comment,
41 Field,
42 Value,
43}
44
45impl Default for EventParserState {
46 fn default() -> Self {
47 Self::Init
48 }
49}
50
51#[derive(Default, Debug)]
52pub struct EventParser {
53 state: EventParserState,
54 field: Vec<u8>,
55 value: Vec<u8>,
56 bytes: Option<Vec<u8>>,
57 pos: usize,
58 result: Event,
59}
60
61impl EventParser {
62 pub fn push_bytes(&mut self, bytes: Vec<u8>) {
63 self.bytes = Some(bytes);
64 }
65
66 pub fn needs_bytes(&self) -> bool {
67 self.bytes.is_none()
68 }
69
70 pub fn filter_state(&mut self) -> Option<crate::Result<Changes>> {
71 #[allow(clippy::never_loop)]
72 #[allow(clippy::while_let_on_iterator)]
73 while let Some(event) = self.next() {
74 match event {
75 Ok(Event {
76 event: EventType::State,
77 data,
78 id,
79 ..
80 }) => {
81 return match serde_json::from_slice::<StateChange>(&data) {
82 Ok(state_change) => Some(Ok(Changes {
83 id: if !id.is_empty() {
84 Some(String::from_utf8(id).unwrap_or_default())
85 } else {
86 None
87 },
88 changes: state_change.changed,
89 })),
90 Err(err) => Some(Err(err.into())),
91 };
92 }
93 Ok(Event {
94 event: EventType::Ping,
95 #[cfg(feature = "debug")]
96 id,
97 ..
98 }) => {
99 #[cfg(feature = "debug")]
100 return Some(Ok(Changes {
101 id: if !id.is_empty() {
102 Some(String::from_utf8(id).unwrap_or_default())
103 } else {
104 None
105 },
106 changes: ahash::AHashMap::from_iter([(
107 "ping".to_string(),
108 ahash::AHashMap::new(),
109 )]),
110 }));
111 }
112 Err(err) => return Some(Err(err)),
113 }
114 }
115 None
116 }
117}
118
119impl Iterator for EventParser {
120 type Item = crate::Result<Event>;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 let bytes = self.bytes.as_ref()?;
124
125 for byte in bytes.get(self.pos..)? {
126 self.pos += 1;
127
128 match self.state {
129 EventParserState::Init => match byte {
130 b':' => {
131 self.state = EventParserState::Comment;
132 }
133 b'\r' | b' ' => (),
134 b'\n' => {
135 return Some(Ok(std::mem::take(&mut self.result)));
136 }
137 _ => {
138 self.state = EventParserState::Field;
139 self.field.push(*byte);
140 }
141 },
142 EventParserState::Comment => {
143 if *byte == b'\n' {
144 self.state = EventParserState::Init;
145 }
146 }
147 EventParserState::Field => match byte {
148 b'\r' => (),
149 b'\n' => {
150 self.state = EventParserState::Init;
151 self.field.clear();
152 }
153 b':' => {
154 self.state = EventParserState::Value;
155 }
156 _ => {
157 if self.field.len() >= MAX_EVENT_SIZE {
158 return Some(Err(crate::Error::Internal(
159 "EventSource response is too long.".to_string(),
160 )));
161 }
162
163 self.field.push(*byte);
164 }
165 },
166 EventParserState::Value => match byte {
167 b'\r' => (),
168 b' ' if self.value.is_empty() => (),
169 b'\n' => {
170 self.state = EventParserState::Init;
171 match &self.field[..] {
172 b"id" => {
173 self.result.id.extend_from_slice(&self.value);
174 }
175 b"data" => {
176 self.result.data.extend_from_slice(&self.value);
177 }
178 b"event" => {
179 if self.value == b"ping" {
180 self.result.event = EventType::Ping;
181 } else {
182 self.result.event = EventType::State;
183 }
184 }
185 _ => {
186 }
188 }
189 self.field.clear();
190 self.value.clear();
191 }
192 _ => {
193 if (self.field.len() + self.value.len()) >= MAX_EVENT_SIZE {
194 return Some(Err(crate::Error::Internal(
195 "EventSource response is too long.".to_string(),
196 )));
197 }
198
199 self.value.push(*byte);
200 }
201 },
202 }
203 }
204
205 self.bytes = None;
206 self.pos = 0;
207
208 None
209 }
210}
211
212#[cfg(test)]
213mod tests {
214
215 use super::{Event, EventType};
216
217 #[derive(Debug, PartialEq, Eq)]
218 struct EventString {
219 event: EventType,
220 id: String,
221 data: String,
222 }
223
224 impl From<Event> for EventString {
225 fn from(event: Event) -> Self {
226 Self {
227 event: event.event,
228 id: String::from_utf8(event.id).unwrap(),
229 data: String::from_utf8(event.data).unwrap(),
230 }
231 }
232 }
233
234 #[test]
235 fn parse() {
236 let mut parser = super::EventParser::default();
237 let mut results = Vec::new();
238
239 for frame in [
240 Vec::from("event: state\nid: 0\ndata: test\n\n"),
241 Vec::from("event: ping\nid:123\ndata: ping pa"),
242 Vec::from("yload"),
243 Vec::from("\n\n"),
244 Vec::from(":comment\n\n"),
245 Vec::from("data: YHOO\n"),
246 Vec::from("data: +2\n"),
247 Vec::from("data: 10\n\n"),
248 Vec::from(": test stream\n"),
249 Vec::from("data: first event\n"),
250 Vec::from("id: 1\n\n"),
251 Vec::from("data:second event\n"),
252 Vec::from("id\n\n"),
253 Vec::from("data: third event\n\n"),
254 Vec::from("data:hello\n\ndata: world\n\n"),
255 ] {
256 parser.push_bytes(frame);
257
258 #[allow(clippy::while_let_on_iterator)]
259 while let Some(event) = parser.next() {
260 results.push(EventString::from(event.unwrap()));
261 }
262 }
263
264 assert_eq!(
265 results,
266 vec![
267 EventString {
268 event: EventType::State,
269 id: "0".to_string(),
270 data: "test".to_string()
271 },
272 EventString {
273 event: EventType::Ping,
274 id: "123".to_string(),
275 data: "ping payload".to_string()
276 },
277 EventString {
278 event: EventType::State,
279 id: "".to_string(),
280 data: "".to_string()
281 },
282 EventString {
283 event: EventType::State,
284 id: "".to_string(),
285 data: "YHOO+210".to_string()
286 },
287 EventString {
288 event: EventType::State,
289 id: "1".to_string(),
290 data: "first event".to_string()
291 },
292 EventString {
293 event: EventType::State,
294 id: "".to_string(),
295 data: "second event".to_string()
296 },
297 EventString {
298 event: EventType::State,
299 id: "".to_string(),
300 data: "third event".to_string()
301 },
302 EventString {
303 event: EventType::State,
304 id: "".to_string(),
305 data: "hello".to_string()
306 },
307 EventString {
308 event: EventType::State,
309 id: "".to_string(),
310 data: "world".to_string()
311 }
312 ]
313 );
314 }
315}