1use bytes::{Buf, BufMut, Bytes, BytesMut};
47use tokio_util::codec::{Decoder, Encoder};
48
49use crate::{
50 codec_utils::{calculate_data_crc16, calculate_header_crc16, flags, msg_types},
51 compression::Compressor,
52 encryption::DesEncryptor,
53 error::{Result, ZusError},
54};
55
56pub const MOBILE_MAGIC: u8 = 0xDF;
58
59pub const HEARTBEAT_MAGIC: u8 = 0xEF;
61
62pub const MOBILE_HEADER_SIZE: usize = 16;
64
65pub const HEARTBEAT_SIZE: usize = 8;
67
68pub const MOBILE_COMPRESS_THRESHOLD: usize = 256;
71
72#[derive(Debug, Clone)]
76pub struct MobileProtocolHeader {
77 pub magic: u8,
79 pub version: u8,
81 pub msg_type: u8,
83 pub flags: u8,
85 pub headcrc16: u16,
87 pub datacrc: u16,
89 pub seq_id: u16,
91 pub timeout: u16,
93 pub body_length: u32,
95}
96
97impl MobileProtocolHeader {
98 pub fn new(seq_id: u16, msg_type: u8) -> Self {
100 Self {
101 magic: MOBILE_MAGIC,
102 version: 0,
103 msg_type,
104 flags: 0,
105 headcrc16: 0,
106 datacrc: 0,
107 seq_id,
108 timeout: 10, body_length: 0,
110 }
111 }
112
113 pub fn is_compressed(&self) -> bool {
115 (self.flags & flags::COMPRESSED) != 0
116 }
117
118 pub fn is_encrypted(&self) -> bool {
120 (self.flags & flags::ENCRYPTED) != 0
121 }
122
123 pub fn is_fragment(&self) -> bool {
125 (self.flags & flags::DATA_FRAG) != 0
126 }
127
128 pub fn set_compressed(&mut self, compressed: bool) {
130 if compressed {
131 self.flags |= flags::COMPRESSED;
132 } else {
133 self.flags &= !flags::COMPRESSED;
134 }
135 }
136
137 pub fn set_encrypted(&mut self, encrypted: bool) {
139 if encrypted {
140 self.flags |= flags::ENCRYPTED;
141 } else {
142 self.flags &= !flags::ENCRYPTED;
143 }
144 }
145
146 pub fn set_fragment(&mut self, is_fragment: bool) {
148 if is_fragment {
149 self.flags |= flags::DATA_FRAG;
150 } else {
151 self.flags &= !flags::DATA_FRAG;
152 }
153 }
154
155 fn calculate_headcrc16(&self) -> u16 {
157 let fields: &[u64] = &[
159 self.magic as u64,
160 self.version as u64,
161 self.msg_type as u64,
162 self.flags as u64,
163 self.datacrc as u64,
164 self.seq_id as u64,
165 self.timeout as u64,
166 self.body_length as u64,
167 ];
168 calculate_header_crc16(fields)
169 }
170
171 pub fn calculate_datacrc(data: &[u8]) -> u16 {
173 calculate_data_crc16(data)
174 }
175
176 pub fn verify_datacrc(&self, data: &[u8]) -> bool {
178 Self::calculate_datacrc(data) == self.datacrc
179 }
180
181 pub fn encode(&self, buf: &mut BytesMut) {
183 let headcrc16 = self.calculate_headcrc16();
184
185 buf.put_u8(self.magic);
186 buf.put_u8(self.version);
187 buf.put_u8(self.msg_type);
188 buf.put_u8(self.flags);
189 buf.put_u16(headcrc16);
190 buf.put_u16(self.datacrc);
191 buf.put_u16(self.seq_id);
192 buf.put_u16(self.timeout);
193 buf.put_u32(self.body_length);
194 }
195
196 pub fn decode(buf: &mut BytesMut) -> Result<Self> {
198 if buf.len() < MOBILE_HEADER_SIZE {
199 return Err(ZusError::Protocol(format!(
200 "Insufficient data for mobile header: got {} bytes, need {}",
201 buf.len(),
202 MOBILE_HEADER_SIZE
203 )));
204 }
205
206 let magic = buf.get_u8();
207 if magic != MOBILE_MAGIC {
208 return Err(ZusError::InvalidMagic(magic as u16));
209 }
210
211 let version = buf.get_u8();
212 let msg_type = buf.get_u8();
213 let flags = buf.get_u8();
214 let headcrc16 = buf.get_u16();
215 let datacrc = buf.get_u16();
216 let seq_id = buf.get_u16();
217 let timeout = buf.get_u16();
218 let body_length = buf.get_u32();
219
220 let header = Self {
221 magic,
222 version,
223 msg_type,
224 flags,
225 headcrc16,
226 datacrc,
227 seq_id,
228 timeout,
229 body_length,
230 };
231
232 let calculated_crc = header.calculate_headcrc16();
234 if calculated_crc != headcrc16 {
235 return Err(ZusError::Protocol(format!(
236 "Invalid mobile header CRC16: expected {calculated_crc:#06x}, got {headcrc16:#06x}"
237 )));
238 }
239
240 Ok(header)
241 }
242}
243
244#[derive(Debug, Clone, Copy, Default)]
246pub struct MobileHeartbeat {
247 pub magic: u8,
249 pub reserved: [u8; 7],
251}
252
253impl MobileHeartbeat {
254 pub fn new() -> Self {
256 Self {
257 magic: HEARTBEAT_MAGIC,
258 reserved: [0u8; 7],
259 }
260 }
261
262 pub fn encode(&self, buf: &mut BytesMut) {
264 buf.put_u8(self.magic);
265 buf.put_slice(&self.reserved);
266 }
267
268 pub fn decode(buf: &mut BytesMut) -> Result<Self> {
270 if buf.len() < HEARTBEAT_SIZE {
271 return Err(ZusError::Protocol(format!(
272 "Insufficient data for heartbeat: got {} bytes, need {}",
273 buf.len(),
274 HEARTBEAT_SIZE
275 )));
276 }
277
278 let magic = buf.get_u8();
279 if magic != HEARTBEAT_MAGIC {
280 return Err(ZusError::Protocol(format!(
281 "Invalid heartbeat magic: expected {HEARTBEAT_MAGIC:#04x}, got {magic:#04x}"
282 )));
283 }
284
285 let mut reserved = [0u8; 7];
286 buf.copy_to_slice(&mut reserved);
287
288 Ok(Self { magic, reserved })
289 }
290
291 pub fn is_heartbeat(buf: &[u8]) -> bool {
293 !buf.is_empty() && buf[0] == HEARTBEAT_MAGIC
294 }
295}
296
297#[derive(Debug, Clone)]
299pub struct MobileMessage {
300 pub header: MobileProtocolHeader,
301 pub method: Bytes,
302 pub body: Bytes,
303}
304
305impl MobileMessage {
306 pub fn new_request(seq_id: u16, method: Bytes, body: Bytes) -> Self {
308 let mut header = MobileProtocolHeader::new(seq_id, msg_types::REQUEST);
309 header.body_length = (method.len() + body.len()) as u32;
310 header.datacrc = MobileProtocolHeader::calculate_datacrc(&body);
311 Self { header, method, body }
312 }
313
314 pub fn new_response(seq_id: u16, body: Bytes) -> Self {
316 let mut header = MobileProtocolHeader::new(seq_id, msg_types::RESPONSE);
317 header.body_length = body.len() as u32;
318 header.datacrc = MobileProtocolHeader::calculate_datacrc(&body);
319 Self {
320 header,
321 method: Bytes::new(),
322 body,
323 }
324 }
325
326 pub fn new_notify(seq_id: u16, method: Bytes, body: Bytes) -> Self {
328 let mut header = MobileProtocolHeader::new(seq_id, msg_types::NOTIFY);
329 header.body_length = (method.len() + body.len()) as u32;
330 header.datacrc = MobileProtocolHeader::calculate_datacrc(&body);
331 Self { header, method, body }
332 }
333
334 pub fn new_sysrsp(seq_id: u16, body: Bytes) -> Self {
336 let mut header = MobileProtocolHeader::new(seq_id, msg_types::SYS_RESPONSE);
337 header.body_length = body.len() as u32;
338 header.datacrc = MobileProtocolHeader::calculate_datacrc(&body);
339 Self {
340 header,
341 method: Bytes::new(),
342 body,
343 }
344 }
345}
346
347#[derive(Debug, Clone)]
349pub enum MobilePacket {
350 Message(MobileMessage),
352 Heartbeat(MobileHeartbeat),
354}
355
356pub struct MobileCodec {
361 max_frame_length: usize,
362 pub compressor: Compressor,
363 encryptor: Option<DesEncryptor>,
364}
365
366impl MobileCodec {
367 pub fn new() -> Self {
371 Self {
372 max_frame_length: 10 * 1024 * 1024, compressor: Compressor::with_config(
374 true,
375 MOBILE_COMPRESS_THRESHOLD,
376 crate::compression::CompressionType::QuickLZ,
377 ),
378 encryptor: None,
379 }
380 }
381
382 pub fn with_max_frame_length(max_frame_length: usize) -> Self {
384 Self {
385 max_frame_length,
386 compressor: Compressor::with_config(
387 true,
388 MOBILE_COMPRESS_THRESHOLD,
389 crate::compression::CompressionType::QuickLZ,
390 ),
391 encryptor: None,
392 }
393 }
394
395 pub fn with_compressor(compressor: Compressor) -> Self {
397 Self {
398 max_frame_length: 10 * 1024 * 1024,
399 compressor,
400 encryptor: None,
401 }
402 }
403
404 pub fn with_encryption(key: &[u8]) -> Result<Self> {
406 let encryptor = DesEncryptor::try_new(key)?;
407 Ok(Self {
408 max_frame_length: 10 * 1024 * 1024,
409 compressor: Compressor::with_config(
410 true,
411 MOBILE_COMPRESS_THRESHOLD,
412 crate::compression::CompressionType::QuickLZ,
413 ),
414 encryptor: Some(encryptor),
415 })
416 }
417
418 pub fn with_full_config(
420 max_frame_length: usize,
421 compressor: Compressor,
422 encryption_key: Option<&[u8]>,
423 ) -> Result<Self> {
424 let encryptor = match encryption_key {
425 | Some(key) => Some(DesEncryptor::try_new(key)?),
426 | None => None,
427 };
428 Ok(Self {
429 max_frame_length,
430 compressor,
431 encryptor,
432 })
433 }
434
435 pub fn set_encryption_key(&mut self, key: Option<&[u8]>) -> Result<()> {
437 self.encryptor = match key {
438 | Some(k) => Some(DesEncryptor::try_new(k)?),
439 | None => None,
440 };
441 Ok(())
442 }
443
444 pub fn is_encryption_enabled(&self) -> bool {
446 self.encryptor.is_some()
447 }
448
449 fn peek_packet_type(buf: &[u8]) -> Option<PacketType> {
451 if buf.is_empty() {
452 return None;
453 }
454 match buf[0] {
455 | MOBILE_MAGIC => Some(PacketType::Message),
456 | HEARTBEAT_MAGIC => Some(PacketType::Heartbeat),
457 | _ => None,
458 }
459 }
460}
461
462#[derive(Debug, Clone, Copy, PartialEq, Eq)]
463enum PacketType {
464 Message,
465 Heartbeat,
466}
467
468impl Default for MobileCodec {
469 fn default() -> Self {
470 Self::new()
471 }
472}
473
474impl Decoder for MobileCodec {
475 type Error = ZusError;
476 type Item = MobilePacket;
477
478 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
479 if src.is_empty() {
480 return Ok(None);
481 }
482
483 let packet_type = match Self::peek_packet_type(src) {
485 | Some(pt) => pt,
486 | None => {
487 return Err(ZusError::InvalidMagic(src[0] as u16));
488 }
489 };
490
491 match packet_type {
492 | PacketType::Heartbeat => {
493 if src.len() < HEARTBEAT_SIZE {
494 return Ok(None);
495 }
496 let heartbeat = MobileHeartbeat::decode(src)?;
497 Ok(Some(MobilePacket::Heartbeat(heartbeat)))
498 }
499 | PacketType::Message => {
500 if src.len() < MOBILE_HEADER_SIZE {
501 return Ok(None);
502 }
503
504 let mut peek_buf = src.clone();
506 let header = MobileProtocolHeader::decode(&mut peek_buf)?;
507
508 let total_length = MOBILE_HEADER_SIZE + header.body_length as usize;
509 if src.len() < total_length {
510 src.reserve(total_length - src.len());
511 return Ok(None);
512 }
513
514 if total_length > self.max_frame_length {
516 return Err(ZusError::Protocol(format!(
517 "Mobile frame too large: {} > {}",
518 total_length, self.max_frame_length
519 )));
520 }
521
522 let header = MobileProtocolHeader::decode(src)?;
524
525 let raw_body = src.split_to(header.body_length as usize).freeze();
527
528 if !header.verify_datacrc(&raw_body) {
530 return Err(ZusError::CrcMismatch);
531 }
532
533 let decrypted_body = if header.is_encrypted() {
535 match &self.encryptor {
536 | Some(enc) => Bytes::from(enc.decrypt(&raw_body)?),
537 | None => {
538 return Err(ZusError::Encryption(
539 "Received encrypted message but no encryption key configured".to_string(),
540 ));
541 }
542 }
543 } else {
544 raw_body
545 };
546
547 let (method, body) = if header.msg_type == msg_types::REQUEST || header.msg_type == msg_types::NOTIFY {
549 let mut body_buf = decrypted_body;
551
552 if body_buf.len() < 4 {
553 return Err(ZusError::Protocol("Invalid method length".to_string()));
554 }
555 let method_len = body_buf.get_u32() as usize;
556 if body_buf.len() < method_len {
557 return Err(ZusError::Protocol("Invalid method data".to_string()));
558 }
559 let method = body_buf.split_to(method_len);
560
561 if body_buf.len() < 4 {
562 return Err(ZusError::Protocol("Invalid params length".to_string()));
563 }
564 let params_len = body_buf.get_u32() as usize;
565 if body_buf.len() < params_len {
566 return Err(ZusError::Protocol("Invalid params data".to_string()));
567 }
568 let raw_params = body_buf.split_to(params_len);
569
570 let params = if header.is_compressed() {
572 self.compressor.decompress(&raw_params)?
573 } else {
574 raw_params
575 };
576
577 (method, params)
578 } else {
579 let body_data = if header.is_encrypted() {
581 let mut body_buf = decrypted_body;
583 if body_buf.len() < 4 {
584 return Err(ZusError::Protocol("Invalid encrypted response".to_string()));
585 }
586 let original_len = body_buf.get_u32() as usize;
587 if body_buf.len() < original_len {
588 return Err(ZusError::Protocol("Invalid encrypted response length".to_string()));
589 }
590 body_buf.split_to(original_len)
591 } else {
592 decrypted_body
593 };
594
595 let body = if header.is_compressed() {
596 self.compressor.decompress(&body_data)?
597 } else {
598 body_data
599 };
600 (Bytes::new(), body)
601 };
602
603 Ok(Some(MobilePacket::Message(MobileMessage { header, method, body })))
604 }
605 }
606 }
607}
608
609impl Encoder<MobilePacket> for MobileCodec {
610 type Error = ZusError;
611
612 fn encode(&mut self, item: MobilePacket, dst: &mut BytesMut) -> Result<()> {
613 match item {
614 | MobilePacket::Heartbeat(hb) => {
615 dst.reserve(HEARTBEAT_SIZE);
616 hb.encode(dst);
617 Ok(())
618 }
619 | MobilePacket::Message(mut msg) => {
620 let is_request = !msg.method.is_empty();
621 let mut full_body = BytesMut::new();
622 let was_compressed: bool;
623
624 if is_request {
625 let (compressed_params, params_compressed) = self.compressor.compress(&msg.body)?;
627 was_compressed = params_compressed;
628
629 full_body.put_u32(msg.method.len() as u32);
630 full_body.put(msg.method);
631 full_body.put_u32(compressed_params.len() as u32);
632 full_body.put(compressed_params);
633 } else {
634 let (compressed_body, body_compressed) = self.compressor.compress(&msg.body)?;
636 was_compressed = body_compressed;
637 full_body.put(compressed_body);
638 }
639
640 let final_body = if let Some(ref enc) = self.encryptor {
642 let data_to_encrypt = if is_request {
643 full_body.freeze()
644 } else {
645 let mut with_len = BytesMut::with_capacity(4 + full_body.len());
646 with_len.put_u32(full_body.len() as u32);
647 with_len.put(full_body);
648 with_len.freeze()
649 };
650 let encrypted = enc.encrypt(&data_to_encrypt)?;
651 msg.header.set_encrypted(true);
652 Bytes::from(encrypted)
653 } else {
654 msg.header.set_encrypted(false);
655 full_body.freeze()
656 };
657
658 msg.header.body_length = final_body.len() as u32;
660 msg.header.set_compressed(was_compressed);
661 msg.header.datacrc = MobileProtocolHeader::calculate_datacrc(&final_body);
662
663 let total_length = MOBILE_HEADER_SIZE + final_body.len();
665 dst.reserve(total_length);
666 msg.header.encode(dst);
667 dst.put(final_body);
668
669 Ok(())
670 }
671 }
672 }
673}
674
675impl Encoder<MobileMessage> for MobileCodec {
677 type Error = ZusError;
678
679 fn encode(&mut self, item: MobileMessage, dst: &mut BytesMut) -> Result<()> {
680 <Self as Encoder<MobilePacket>>::encode(self, MobilePacket::Message(item), dst)
681 }
682}
683
684impl Encoder<MobileHeartbeat> for MobileCodec {
686 type Error = ZusError;
687
688 fn encode(&mut self, item: MobileHeartbeat, dst: &mut BytesMut) -> Result<()> {
689 <Self as Encoder<MobilePacket>>::encode(self, MobilePacket::Heartbeat(item), dst)
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696
697 #[test]
698 fn test_mobile_header_encode_decode() {
699 let mut header = MobileProtocolHeader::new(123, msg_types::REQUEST);
700 header.body_length = 100;
701 header.datacrc = 0x1234;
702
703 let mut buf = BytesMut::new();
704 header.encode(&mut buf);
705
706 assert_eq!(buf.len(), MOBILE_HEADER_SIZE);
707
708 let decoded = MobileProtocolHeader::decode(&mut buf).unwrap();
709 assert_eq!(decoded.magic, MOBILE_MAGIC);
710 assert_eq!(decoded.seq_id, 123);
711 assert_eq!(decoded.msg_type, msg_types::REQUEST);
712 assert_eq!(decoded.body_length, 100);
713 assert_eq!(decoded.datacrc, 0x1234);
714 }
715
716 #[test]
717 fn test_heartbeat_encode_decode() {
718 let heartbeat = MobileHeartbeat::new();
719
720 let mut buf = BytesMut::new();
721 heartbeat.encode(&mut buf);
722
723 assert_eq!(buf.len(), HEARTBEAT_SIZE);
724 assert_eq!(buf[0], HEARTBEAT_MAGIC);
725
726 let decoded = MobileHeartbeat::decode(&mut buf).unwrap();
727 assert_eq!(decoded.magic, HEARTBEAT_MAGIC);
728 }
729
730 #[test]
731 fn test_mobile_codec_message_roundtrip() {
732 let mut codec = MobileCodec::new();
733
734 let method = Bytes::from("test.method");
735 let body = Bytes::from("hello mobile");
736 let msg = MobileMessage::new_request(42, method.clone(), body.clone());
737
738 let mut buf = BytesMut::new();
739 codec.encode(MobilePacket::Message(msg), &mut buf).unwrap();
740
741 let decoded = codec.decode(&mut buf).unwrap().unwrap();
742 match decoded {
743 | MobilePacket::Message(msg) => {
744 assert_eq!(msg.header.seq_id, 42);
745 assert_eq!(msg.method, method);
746 assert_eq!(msg.body, body);
747 }
748 | _ => panic!("Expected message, got heartbeat"),
749 }
750 }
751
752 #[test]
753 fn test_mobile_codec_heartbeat_roundtrip() {
754 let mut codec = MobileCodec::new();
755
756 let heartbeat = MobileHeartbeat::new();
757 let mut buf = BytesMut::new();
758 codec.encode(MobilePacket::Heartbeat(heartbeat), &mut buf).unwrap();
759
760 let decoded = codec.decode(&mut buf).unwrap().unwrap();
761 match decoded {
762 | MobilePacket::Heartbeat(hb) => {
763 assert_eq!(hb.magic, HEARTBEAT_MAGIC);
764 }
765 | _ => panic!("Expected heartbeat, got message"),
766 }
767 }
768
769 #[test]
770 fn test_mobile_codec_with_encryption() {
771 let key = b"12345678";
772 let mut codec = MobileCodec::with_encryption(key).unwrap();
773
774 let method = Bytes::from("encrypted.call");
775 let body = Bytes::from("secret data");
776 let msg = MobileMessage::new_request(1, method.clone(), body.clone());
777
778 let mut buf = BytesMut::new();
779 codec.encode(MobilePacket::Message(msg), &mut buf).unwrap();
780
781 assert_eq!(buf[3] & flags::ENCRYPTED, flags::ENCRYPTED);
783
784 let decoded = codec.decode(&mut buf).unwrap().unwrap();
785 match decoded {
786 | MobilePacket::Message(msg) => {
787 assert_eq!(msg.method, method);
788 assert_eq!(msg.body, body);
789 }
790 | _ => panic!("Expected message"),
791 }
792 }
793
794 #[test]
795 fn test_mobile_codec_response() {
796 let mut codec = MobileCodec::new();
797
798 let body = Bytes::from("response data");
799 let msg = MobileMessage::new_response(99, body.clone());
800
801 let mut buf = BytesMut::new();
802 codec.encode(MobilePacket::Message(msg), &mut buf).unwrap();
803
804 let decoded = codec.decode(&mut buf).unwrap().unwrap();
805 match decoded {
806 | MobilePacket::Message(msg) => {
807 assert_eq!(msg.header.seq_id, 99);
808 assert_eq!(msg.header.msg_type, msg_types::RESPONSE);
809 assert!(msg.method.is_empty());
810 assert_eq!(msg.body, body);
811 }
812 | _ => panic!("Expected message"),
813 }
814 }
815
816 #[test]
817 fn test_mobile_flags() {
818 let mut header = MobileProtocolHeader::new(1, msg_types::REQUEST);
819
820 assert!(!header.is_compressed());
821 assert!(!header.is_encrypted());
822 assert!(!header.is_fragment());
823
824 header.set_compressed(true);
825 assert!(header.is_compressed());
826
827 header.set_encrypted(true);
828 assert!(header.is_encrypted());
829
830 header.set_fragment(true);
831 assert!(header.is_fragment());
832
833 assert_eq!(header.flags, 0x07);
834 }
835
836 #[test]
837 fn test_invalid_magic() {
838 let mut buf = BytesMut::new();
839 buf.put_u8(0xFF); buf.put_slice(&[0u8; 15]);
841
842 let result = MobileProtocolHeader::decode(&mut buf);
843 assert!(result.is_err());
844 }
845
846 #[test]
847 fn test_is_heartbeat() {
848 let heartbeat_buf = [HEARTBEAT_MAGIC, 0, 0, 0, 0, 0, 0, 0];
849 let message_buf = [MOBILE_MAGIC, 0, 0, 0, 0, 0, 0, 0];
850
851 assert!(MobileHeartbeat::is_heartbeat(&heartbeat_buf));
852 assert!(!MobileHeartbeat::is_heartbeat(&message_buf));
853 assert!(!MobileHeartbeat::is_heartbeat(&[]));
854 }
855
856 #[test]
857 fn test_compression_threshold() {
858 let codec = MobileCodec::new();
859 assert_eq!(codec.compressor.threshold_bytes, MOBILE_COMPRESS_THRESHOLD);
861 }
862}