1use crate::error::Error::{InvalidJson, InvalidUtf8};
2use crate::error::{Error, Result};
3use bytes::{BufMut, Bytes, BytesMut};
4use serde_json::Value;
5use std::{
6 convert::TryFrom,
7 sync::atomic::{AtomicUsize, Ordering},
8};
9
10#[derive(Debug, Copy, Clone, Eq, PartialEq)]
12pub enum PacketType {
13 Connect = 0,
14 Disconnect = 1,
15 Event = 2,
16 Ack = 3,
17 ConnectError = 4,
18 BinaryEvent = 5,
19 BinaryAck = 6,
20}
21
22#[derive(Debug, PartialEq, Eq, Clone)]
24pub struct Packet {
25 pub ptype: PacketType,
26 pub nsp: String,
27 pub data: Option<Value>,
28 pub id: Option<usize>,
29 pub attachment_count: u8,
30 pub attachments: Option<Vec<Bytes>>,
31}
32
33impl Default for Packet {
34 fn default() -> Self {
35 Self {
36 ptype: PacketType::Event,
37 nsp: String::from("/"),
38 data: None,
39 id: None,
40 attachment_count: 0,
41 attachments: None,
42 }
43 }
44}
45
46impl TryFrom<u8> for PacketType {
47 type Error = Error;
48 fn try_from(b: u8) -> Result<Self> {
49 PacketType::try_from(b as char)
50 }
51}
52
53impl TryFrom<char> for PacketType {
54 type Error = Error;
55 fn try_from(b: char) -> Result<Self> {
56 match b {
57 '0' => Ok(PacketType::Connect),
58 '1' => Ok(PacketType::Disconnect),
59 '2' => Ok(PacketType::Event),
60 '3' => Ok(PacketType::Ack),
61 '4' => Ok(PacketType::ConnectError),
62 '5' => Ok(PacketType::BinaryEvent),
63 '6' => Ok(PacketType::BinaryAck),
64 _ => Err(Error::InvalidPacketType(b)),
65 }
66 }
67}
68
69impl Packet {
70 pub const fn new(
72 ptype: PacketType,
73 nsp: String,
74 data: Option<Value>,
75 id: Option<usize>,
76 attachment_count: u8,
77 attachments: Option<Vec<Bytes>>,
78 ) -> Self {
79 Packet {
80 ptype,
81 nsp,
82 data,
83 id,
84 attachment_count,
85 attachments,
86 }
87 }
88}
89
90impl From<Packet> for Bytes {
91 fn from(packet: Packet) -> Self {
92 Bytes::from(&packet)
93 }
94}
95
96impl From<&Packet> for Bytes {
97 fn from(packet: &Packet) -> Bytes {
101 let mut string = (packet.ptype as u8).to_string();
103
104 if let PacketType::BinaryAck | PacketType::BinaryEvent = packet.ptype {
106 string.push_str(&packet.attachment_count.to_string());
107 string.push('-');
108 }
109
110 if packet.nsp != "/" {
113 string.push_str(packet.nsp.as_ref());
114 string.push(',');
115 }
116
117 if let Some(id) = packet.id.as_ref() {
119 string.push_str(&id.to_string());
120 }
121
122 let mut buffer = BytesMut::new();
123 buffer.put(string.as_ref());
124
125 if let Some(data) = &packet.data {
126 let data = serde_json::to_string(data).unwrap();
128 buffer.put(data.as_ref());
129 }
130
131 buffer.freeze()
132 }
133}
134
135impl TryFrom<Bytes> for Packet {
136 type Error = Error;
137 fn try_from(value: Bytes) -> Result<Self> {
138 Packet::try_from(&value)
139 }
140}
141
142impl TryFrom<&Bytes> for Packet {
143 type Error = Error;
144 fn try_from(payload: &Bytes) -> Result<Packet> {
152 let mut packet: Packet = Default::default();
153 let payload_utf8 =
154 String::from_utf8(payload.to_vec()).map_err(|e| InvalidUtf8(e.utf8_error()))?;
155 let mut utf8_iter = payload_utf8.chars().into_iter().peekable();
156 let mut next_utf8;
157 let mut char_buf: Vec<char> = vec![];
158
159 packet.ptype = PacketType::try_from(utf8_iter.next().ok_or(Error::IncompletePacket())?)?;
161
162 if let PacketType::BinaryAck | PacketType::BinaryEvent = packet.ptype {
164 loop {
165 next_utf8 = utf8_iter.peek().ok_or(Error::IncompletePacket())?;
166 if *next_utf8 == '-' {
167 let _ = utf8_iter.next(); break;
169 }
170 char_buf.push(utf8_iter.next().unwrap()); }
172 }
173 let count_str: String = char_buf.iter().collect();
174 if let Ok(count) = count_str.parse::<u8>() {
175 packet.attachment_count = count;
176 }
177
178 char_buf.clear();
180 next_utf8 = match utf8_iter.peek() {
181 Some(c) => c,
182 None => return Ok(packet),
183 };
184
185 if *next_utf8 == '/' {
186 char_buf.push(utf8_iter.next().unwrap()); loop {
188 next_utf8 = utf8_iter.peek().ok_or(Error::IncompletePacket())?;
189 if *next_utf8 == ',' {
190 let _ = utf8_iter.next(); break;
192 }
193 char_buf.push(utf8_iter.next().unwrap()); }
195 }
196 if !char_buf.is_empty() {
197 packet.nsp = char_buf.iter().collect();
198 }
199
200 char_buf.clear();
202 next_utf8 = match utf8_iter.peek() {
203 None => return Ok(packet),
204 Some(c) => c,
205 };
206
207 loop {
208 if !next_utf8.is_ascii_digit() {
209 break;
210 }
211 char_buf.push(utf8_iter.next().unwrap()); next_utf8 = match utf8_iter.peek() {
213 None => return Ok(packet),
214 Some(c) => c,
215 };
216 }
217
218 let count_str: String = char_buf.iter().collect();
219 if let Ok(count) = count_str.parse::<usize>() {
220 packet.id = Some(count);
221 }
222
223 let json_str: String = utf8_iter.into_iter().collect();
225 let json_data: Value = serde_json::from_str(&json_str).map_err(InvalidJson)?;
226
227 packet.data = match json_data {
228 Value::Array(vec) if vec.is_empty() => None,
229 _ => Some(json_data),
230 };
231
232 Ok(packet)
233 }
234}
235
236#[derive(Default)]
237pub(crate) struct AckIdGenerator {
238 seq: AtomicUsize,
239}
240
241impl AckIdGenerator {
242 pub fn generate(&self) -> usize {
243 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
244 seq as usize
245 }
246}
247
248#[cfg(test)]
249mod test {
250 use super::*;
251 use serde_json::json;
252
253 #[test]
254 fn test_decode() {
257 let payload = Bytes::from_static(b"0{\"token\":\"123\"}");
258 let packet = Packet::try_from(&payload);
259 assert!(packet.is_ok());
260
261 assert_eq!(
262 Packet::new(
263 PacketType::Connect,
264 "/".to_owned(),
265 Some(json!({"token": "123"})),
266 None,
267 0,
268 None,
269 ),
270 packet.unwrap()
271 );
272
273 let utf8_data = Some(json!({"token™":"123"}));
274 let utf8_payload = format!("0/admin™,{}", serde_json::to_string(&utf8_data).unwrap());
275 let payload = Bytes::from(utf8_payload);
276 let packet = Packet::try_from(&payload);
277 assert!(packet.is_ok());
278
279 assert_eq!(
280 Packet::new(
281 PacketType::Connect,
282 "/admin™".to_owned(),
283 utf8_data,
284 None,
285 0,
286 None,
287 ),
288 packet.unwrap()
289 );
290
291 let payload = Bytes::from_static(b"1/admin,");
292 let packet = Packet::try_from(&payload);
293 assert!(packet.is_ok());
294
295 assert_eq!(
296 Packet::new(
297 PacketType::Disconnect,
298 "/admin".to_owned(),
299 None,
300 None,
301 0,
302 None,
303 ),
304 packet.unwrap()
305 );
306
307 let payload = Bytes::from_static(b"2[\"hello\",1]");
308 let packet = Packet::try_from(&payload);
309 assert!(packet.is_ok());
310
311 assert_eq!(
312 Packet::new(
313 PacketType::Event,
314 "/".to_owned(),
315 Some(json!(["hello", 1])),
316 None,
317 0,
318 None,
319 ),
320 packet.unwrap()
321 );
322
323 let payload = Bytes::from_static(b"2/admin,456[\"project:delete\",123]");
324 let packet = Packet::try_from(&payload);
325 assert!(packet.is_ok());
326
327 assert_eq!(
328 Packet::new(
329 PacketType::Event,
330 "/admin".to_owned(),
331 Some(json!(["project:delete", 123])),
332 Some(456),
333 0,
334 None,
335 ),
336 packet.unwrap()
337 );
338
339 let payload = Bytes::from_static(b"3/admin,456[]");
340 let packet = Packet::try_from(&payload);
341 assert!(packet.is_ok());
342
343 assert_eq!(
344 Packet::new(
345 PacketType::Ack,
346 "/admin".to_owned(),
347 None,
348 Some(456),
349 0,
350 None,
351 ),
352 packet.unwrap()
353 );
354
355 let payload = Bytes::from_static(b"4/admin,{\"message\":\"Not authorized\"}");
356 let packet = Packet::try_from(&payload);
357 assert!(packet.is_ok());
358
359 assert_eq!(
360 Packet::new(
361 PacketType::ConnectError,
362 "/admin".to_owned(),
363 Some(json!({"message":"Not authorized"})),
364 None,
365 0,
366 None,
367 ),
368 packet.unwrap()
369 );
370
371 let payload = Bytes::from_static(b"51-[\"hello\",{\"_placeholder\":true,\"num\":0}]");
372 let packet = Packet::try_from(&payload);
373 assert!(packet.is_ok());
374
375 assert_eq!(
376 Packet::new(
377 PacketType::BinaryEvent,
378 "/".to_owned(),
379 Some(json!(["hello", {"_placeholder": true, "num":0}])),
380 None,
381 1,
382 None
383 ),
384 packet.unwrap()
385 );
386
387 let payload = Bytes::from_static(
388 b"51-/admin,456[\"project:delete\",{\"_placeholder\":true,\"num\":0}]",
389 );
390 let packet = Packet::try_from(&payload);
391 assert!(packet.is_ok());
392
393 assert_eq!(
394 Packet::new(
395 PacketType::BinaryEvent,
396 "/admin".to_owned(),
397 Some(json!(["project:delete", {"_placeholder": true, "num":0}])),
398 Some(456),
399 1,
400 None,
401 ),
402 packet.unwrap()
403 );
404
405 let payload = Bytes::from_static(b"61-/admin,456[{\"_placeholder\":true,\"num\":0}]");
406 let packet = Packet::try_from(&payload);
407 assert!(packet.is_ok());
408
409 assert_eq!(
410 Packet::new(
411 PacketType::BinaryAck,
412 "/admin".to_owned(),
413 Some(json!([{"_placeholder": true, "num": 0}])),
414 Some(456),
415 1,
416 None,
417 ),
418 packet.unwrap()
419 );
420 }
421
422 #[test]
423 fn test_encode() {
426 let packet = Packet::new(
427 PacketType::Connect,
428 "/".to_owned(),
429 Some(json!({"token": "123"})),
430 None,
431 0,
432 None,
433 );
434
435 assert_eq!(
436 Bytes::from(&packet),
437 "0{\"token\":\"123\"}".to_string().into_bytes()
438 );
439
440 let packet = Packet::new(
441 PacketType::Connect,
442 "/admin".to_owned(),
443 Some(json!({"token": "123"})),
444 None,
445 0,
446 None,
447 );
448
449 assert_eq!(
450 Bytes::from(&packet),
451 "0/admin,{\"token\":\"123\"}".to_string().into_bytes()
452 );
453
454 let packet = Packet::new(
455 PacketType::Disconnect,
456 "/admin".to_owned(),
457 None,
458 None,
459 0,
460 None,
461 );
462
463 assert_eq!(Bytes::from(&packet), "1/admin,".to_string().into_bytes());
464
465 let packet = Packet::new(
466 PacketType::Event,
467 "/".to_owned(),
468 Some(json!(["hello", 1])),
469 None,
470 0,
471 None,
472 );
473
474 assert_eq!(
475 Bytes::from(&packet),
476 "2[\"hello\",1]".to_string().into_bytes()
477 );
478
479 let packet = Packet::new(
480 PacketType::Event,
481 "/admin".to_owned(),
482 Some(json!(["project:delete", 123])),
483 Some(456),
484 0,
485 None,
486 );
487
488 assert_eq!(
489 Bytes::from(&packet),
490 "2/admin,456[\"project:delete\",123]"
491 .to_string()
492 .into_bytes()
493 );
494
495 let packet = Packet::new(
496 PacketType::Ack,
497 "/admin".to_owned(),
498 Some(json!([])),
499 Some(456),
500 0,
501 None,
502 );
503
504 assert_eq!(
505 Bytes::from(&packet),
506 "3/admin,456[]".to_string().into_bytes()
507 );
508
509 let packet = Packet::new(
510 PacketType::ConnectError,
511 "/admin".to_owned(),
512 Some(json!({"message": "Not authorized"})),
513 None,
514 0,
515 None,
516 );
517
518 assert_eq!(
519 Bytes::from(&packet),
520 "4/admin,{\"message\":\"Not authorized\"}"
521 .to_string()
522 .into_bytes()
523 );
524
525 let packet = Packet::new(
526 PacketType::BinaryEvent,
527 "/".to_owned(),
528 Some(json!(["hello", {"_placeholder": true, "num": 0}])),
529 None,
530 1,
531 Some(vec![Bytes::from_static(&[1, 2, 3])]),
532 );
533
534 assert_eq!(
535 Bytes::from(&packet),
536 "51-[\"hello\",{\"_placeholder\":true,\"num\":0}]"
537 .to_string()
538 .into_bytes()
539 );
540
541 let packet = Packet::new(
542 PacketType::BinaryEvent,
543 "/admin".to_owned(),
544 Some(json!(["project:delete", {"_placeholder": true, "num": 0}])),
545 Some(456),
546 1,
547 Some(vec![Bytes::from_static(&[1, 2, 3])]),
548 );
549
550 assert_eq!(
551 Bytes::from(&packet),
552 "51-/admin,456[\"project:delete\",{\"_placeholder\":true,\"num\":0}]"
553 .to_string()
554 .into_bytes()
555 );
556
557 let packet = Packet::new(
558 PacketType::BinaryAck,
559 "/admin".to_owned(),
560 Some(json!([{"_placeholder": true, "num": 0}])),
561 Some(456),
562 1,
563 Some(vec![Bytes::from_static(&[3, 2, 1])]),
564 );
565
566 assert_eq!(
567 Bytes::from(&packet),
568 "61-/admin,456[{\"_placeholder\":true,\"num\":0}]"
569 .to_string()
570 .into_bytes()
571 );
572
573 let packet = Packet::new(
574 PacketType::BinaryEvent,
575 "/admin".to_owned(),
576 Some(
577 json!(["project:delete", {"_placeholder": true, "num": 0},{"_placeholder": true, "num": 1}]),
578 ),
579 Some(456),
580 2,
581 Some(vec![
582 Bytes::from_static(&[3, 2, 1]),
583 Bytes::from_static(&[4]),
584 ]),
585 );
586
587 assert_eq!(
588 Bytes::from(&packet),
589 "52-/admin,456[\"project:delete\",{\"_placeholder\":true,\"num\":0},{\"_placeholder\":true,\"num\":1}]"
590 .to_string()
591 .into_bytes()
592 );
593 }
594
595 #[test]
596 fn test_illegal_packet_id() {
597 let _sut = PacketType::try_from(42).expect_err("error!");
598 assert!(matches!(Error::InvalidPacketType(42 as char), _sut))
599 }
600}