zus_common/
mobile_codec.rs

1//! Mobile Protocol Codec (16-byte header)
2//!
3//! This module implements the mobile-optimized ZUS RPC protocol with a compact
4//! 16-byte header designed for mobile clients with bandwidth constraints.
5//!
6//! ## Protocol Differences from Standard (40-byte)
7//!
8//! | Feature           | Standard (40-byte)      | Mobile (16-byte)        |
9//! |-------------------|-------------------------|-------------------------|
10//! | Header size       | 40 bytes                | 16 bytes                |
11//! | Magic             | 0xD3A7 (2 bytes)        | 0xDF (1 byte)           |
12//! | Sequence ID       | u64 (8 bytes)           | u16 (2 bytes)           |
13//! | Timeout           | u64 (8 bytes)           | u16 (2 bytes, seconds)  |
14//! | Data CRC          | u32 (full CRC32)        | u16 (lower 16 bits)     |
15//! | Trace ID          | u64 (8 bytes)           | Not present             |
16//! | Compress threshold| 4096 bytes              | 256 bytes               |
17//!
18//! ## Header Layout (16 bytes, Big-Endian)
19//!
20//! ```text
21//! Offset  Field       Type    Size    Description
22//! ------  ----------  ------  ------  -----------
23//! 0       magic       u8      1       0xDF
24//! 1       version     u8      1       Protocol version
25//! 2       cmdtype     u8      1       Message type (1=REQ, 2=RSP, 3=NOTIFY, 4=SYSRSP)
26//! 3       flag        u8      1       Flags: bit0=compressed, bit1=encrypted, bit2=fragment
27//! 4       headcrc16   u16     2       Header CRC (truncating addition)
28//! 6       datacrc     u16     2       Data CRC (lower 16 bits of CRC32)
29//! 8       seqId       u16     2       Sequence ID (0-65535, wraps around)
30//! 10      timeout     u16     2       Timeout in seconds (not milliseconds!)
31//! 12      datalen     u32     4       Body length in bytes
32//! Total: 16 bytes
33//! ```
34//!
35//! ## Heartbeat Packet (8 bytes)
36//!
37//! Mobile clients send periodic heartbeat packets with magic 0xEF:
38//! ```text
39//! Offset  Field       Type    Size    Description
40//! ------  ----------  ------  ------  -----------
41//! 0       magic       u8      1       0xEF
42//! 1-7     padding     [u8;7]  7       Reserved/padding
43//! Total: 8 bytes
44//! ```
45
46use bytes::{Buf, BufMut, Bytes, BytesMut};
47use tokio_util::codec::{Decoder, Encoder};
48
49use crate::{
50  codec_utils::{calculate_data_crc16, calculate_header_crc16, flags, msg_types},
51  compression::Compressor,
52  encryption::DesEncryptor,
53  error::{Result, ZusError},
54};
55
56/// Mobile protocol magic number (1 byte)
57pub const MOBILE_MAGIC: u8 = 0xDF;
58
59/// Mobile heartbeat magic number (1 byte)
60pub const HEARTBEAT_MAGIC: u8 = 0xEF;
61
62/// Mobile protocol header size
63pub const MOBILE_HEADER_SIZE: usize = 16;
64
65/// Heartbeat packet size
66pub const HEARTBEAT_SIZE: usize = 8;
67
68/// Default compression threshold for mobile protocol (256 bytes)
69/// Lower than standard (4KB) to optimize for mobile bandwidth
70pub const MOBILE_COMPRESS_THRESHOLD: usize = 256;
71
72/// Mobile Protocol Header (16 bytes)
73///
74/// Compact header designed for mobile clients with bandwidth constraints.
75#[derive(Debug, Clone)]
76pub struct MobileProtocolHeader {
77  /// Magic number: 0xDF
78  pub magic: u8,
79  /// Protocol version
80  pub version: u8,
81  /// Message type: REQ(1), RSP(2), NOTIFY(3), SYSRSP(4)
82  pub msg_type: u8,
83  /// Flags: bit0=compressed, bit1=encrypted, bit2=fragment
84  pub flags: u8,
85  /// Header CRC16 (truncating addition checksum)
86  pub headcrc16: u16,
87  /// Data CRC16 (lower 16 bits of CRC32)
88  pub datacrc: u16,
89  /// Sequence ID (16-bit, wraps around)
90  pub seq_id: u16,
91  /// Timeout in seconds (not milliseconds!)
92  pub timeout: u16,
93  /// Body length in bytes
94  pub body_length: u32,
95}
96
97impl MobileProtocolHeader {
98  /// Create a new mobile protocol header
99  pub fn new(seq_id: u16, msg_type: u8) -> Self {
100    Self {
101      magic: MOBILE_MAGIC,
102      version: 0,
103      msg_type,
104      flags: 0,
105      headcrc16: 0,
106      datacrc: 0,
107      seq_id,
108      timeout: 10, // 10 seconds default
109      body_length: 0,
110    }
111  }
112
113  /// Check if data is compressed
114  pub fn is_compressed(&self) -> bool {
115    (self.flags & flags::COMPRESSED) != 0
116  }
117
118  /// Check if data is encrypted
119  pub fn is_encrypted(&self) -> bool {
120    (self.flags & flags::ENCRYPTED) != 0
121  }
122
123  /// Check if this is a fragment (mobile-specific)
124  pub fn is_fragment(&self) -> bool {
125    (self.flags & flags::DATA_FRAG) != 0
126  }
127
128  /// Set compressed flag
129  pub fn set_compressed(&mut self, compressed: bool) {
130    if compressed {
131      self.flags |= flags::COMPRESSED;
132    } else {
133      self.flags &= !flags::COMPRESSED;
134    }
135  }
136
137  /// Set encrypted flag
138  pub fn set_encrypted(&mut self, encrypted: bool) {
139    if encrypted {
140      self.flags |= flags::ENCRYPTED;
141    } else {
142      self.flags &= !flags::ENCRYPTED;
143    }
144  }
145
146  /// Set fragment flag
147  pub fn set_fragment(&mut self, is_fragment: bool) {
148    if is_fragment {
149      self.flags |= flags::DATA_FRAG;
150    } else {
151      self.flags &= !flags::DATA_FRAG;
152    }
153  }
154
155  /// Calculate header CRC16 (excluding headcrc16 field itself)
156  fn calculate_headcrc16(&self) -> u16 {
157    // Fields to include in CRC calculation (truncated to 16 bits each)
158    let fields: &[u64] = &[
159      self.magic as u64,
160      self.version as u64,
161      self.msg_type as u64,
162      self.flags as u64,
163      self.datacrc as u64,
164      self.seq_id as u64,
165      self.timeout as u64,
166      self.body_length as u64,
167    ];
168    calculate_header_crc16(fields)
169  }
170
171  /// Calculate data CRC16 for body
172  pub fn calculate_datacrc(data: &[u8]) -> u16 {
173    calculate_data_crc16(data)
174  }
175
176  /// Verify data CRC
177  pub fn verify_datacrc(&self, data: &[u8]) -> bool {
178    Self::calculate_datacrc(data) == self.datacrc
179  }
180
181  /// Encode header to bytes (16 bytes, big-endian)
182  pub fn encode(&self, buf: &mut BytesMut) {
183    let headcrc16 = self.calculate_headcrc16();
184
185    buf.put_u8(self.magic);
186    buf.put_u8(self.version);
187    buf.put_u8(self.msg_type);
188    buf.put_u8(self.flags);
189    buf.put_u16(headcrc16);
190    buf.put_u16(self.datacrc);
191    buf.put_u16(self.seq_id);
192    buf.put_u16(self.timeout);
193    buf.put_u32(self.body_length);
194  }
195
196  /// Decode header from bytes
197  pub fn decode(buf: &mut BytesMut) -> Result<Self> {
198    if buf.len() < MOBILE_HEADER_SIZE {
199      return Err(ZusError::Protocol(format!(
200        "Insufficient data for mobile header: got {} bytes, need {}",
201        buf.len(),
202        MOBILE_HEADER_SIZE
203      )));
204    }
205
206    let magic = buf.get_u8();
207    if magic != MOBILE_MAGIC {
208      return Err(ZusError::InvalidMagic(magic as u16));
209    }
210
211    let version = buf.get_u8();
212    let msg_type = buf.get_u8();
213    let flags = buf.get_u8();
214    let headcrc16 = buf.get_u16();
215    let datacrc = buf.get_u16();
216    let seq_id = buf.get_u16();
217    let timeout = buf.get_u16();
218    let body_length = buf.get_u32();
219
220    let header = Self {
221      magic,
222      version,
223      msg_type,
224      flags,
225      headcrc16,
226      datacrc,
227      seq_id,
228      timeout,
229      body_length,
230    };
231
232    // Verify header CRC
233    let calculated_crc = header.calculate_headcrc16();
234    if calculated_crc != headcrc16 {
235      return Err(ZusError::Protocol(format!(
236        "Invalid mobile header CRC16: expected {calculated_crc:#06x}, got {headcrc16:#06x}"
237      )));
238    }
239
240    Ok(header)
241  }
242}
243
244/// Mobile Heartbeat Packet (8 bytes)
245#[derive(Debug, Clone, Copy, Default)]
246pub struct MobileHeartbeat {
247  /// Magic: 0xEF
248  pub magic: u8,
249  /// Reserved bytes
250  pub reserved: [u8; 7],
251}
252
253impl MobileHeartbeat {
254  /// Create a new heartbeat packet
255  pub fn new() -> Self {
256    Self {
257      magic: HEARTBEAT_MAGIC,
258      reserved: [0u8; 7],
259    }
260  }
261
262  /// Encode heartbeat to bytes
263  pub fn encode(&self, buf: &mut BytesMut) {
264    buf.put_u8(self.magic);
265    buf.put_slice(&self.reserved);
266  }
267
268  /// Decode heartbeat from bytes
269  pub fn decode(buf: &mut BytesMut) -> Result<Self> {
270    if buf.len() < HEARTBEAT_SIZE {
271      return Err(ZusError::Protocol(format!(
272        "Insufficient data for heartbeat: got {} bytes, need {}",
273        buf.len(),
274        HEARTBEAT_SIZE
275      )));
276    }
277
278    let magic = buf.get_u8();
279    if magic != HEARTBEAT_MAGIC {
280      return Err(ZusError::Protocol(format!(
281        "Invalid heartbeat magic: expected {HEARTBEAT_MAGIC:#04x}, got {magic:#04x}"
282      )));
283    }
284
285    let mut reserved = [0u8; 7];
286    buf.copy_to_slice(&mut reserved);
287
288    Ok(Self { magic, reserved })
289  }
290
291  /// Check if buffer starts with heartbeat magic
292  pub fn is_heartbeat(buf: &[u8]) -> bool {
293    !buf.is_empty() && buf[0] == HEARTBEAT_MAGIC
294  }
295}
296
297/// Mobile RPC Message
298#[derive(Debug, Clone)]
299pub struct MobileMessage {
300  pub header: MobileProtocolHeader,
301  pub method: Bytes,
302  pub body: Bytes,
303}
304
305impl MobileMessage {
306  /// Create a new request message
307  pub fn new_request(seq_id: u16, method: Bytes, body: Bytes) -> Self {
308    let mut header = MobileProtocolHeader::new(seq_id, msg_types::REQUEST);
309    header.body_length = (method.len() + body.len()) as u32;
310    header.datacrc = MobileProtocolHeader::calculate_datacrc(&body);
311    Self { header, method, body }
312  }
313
314  /// Create a new response message
315  pub fn new_response(seq_id: u16, body: Bytes) -> Self {
316    let mut header = MobileProtocolHeader::new(seq_id, msg_types::RESPONSE);
317    header.body_length = body.len() as u32;
318    header.datacrc = MobileProtocolHeader::calculate_datacrc(&body);
319    Self {
320      header,
321      method: Bytes::new(),
322      body,
323    }
324  }
325
326  /// Create a new notify message
327  pub fn new_notify(seq_id: u16, method: Bytes, body: Bytes) -> Self {
328    let mut header = MobileProtocolHeader::new(seq_id, msg_types::NOTIFY);
329    header.body_length = (method.len() + body.len()) as u32;
330    header.datacrc = MobileProtocolHeader::calculate_datacrc(&body);
331    Self { header, method, body }
332  }
333
334  /// Create a new system response message
335  pub fn new_sysrsp(seq_id: u16, body: Bytes) -> Self {
336    let mut header = MobileProtocolHeader::new(seq_id, msg_types::SYS_RESPONSE);
337    header.body_length = body.len() as u32;
338    header.datacrc = MobileProtocolHeader::calculate_datacrc(&body);
339    Self {
340      header,
341      method: Bytes::new(),
342      body,
343    }
344  }
345}
346
347/// Decoded packet type from mobile protocol
348#[derive(Debug, Clone)]
349pub enum MobilePacket {
350  /// Regular RPC message
351  Message(MobileMessage),
352  /// Heartbeat packet
353  Heartbeat(MobileHeartbeat),
354}
355
356/// Mobile Protocol Codec
357///
358/// Handles encoding/decoding of mobile protocol messages (16-byte header)
359/// and heartbeat packets (8-byte).
360pub struct MobileCodec {
361  max_frame_length: usize,
362  pub compressor: Compressor,
363  encryptor: Option<DesEncryptor>,
364}
365
366impl MobileCodec {
367  /// Create a new mobile codec with default settings
368  ///
369  /// Default compression threshold is 256 bytes (optimized for mobile).
370  pub fn new() -> Self {
371    Self {
372      max_frame_length: 10 * 1024 * 1024, // 10MB
373      compressor: Compressor::with_config(
374        true,
375        MOBILE_COMPRESS_THRESHOLD,
376        crate::compression::CompressionType::QuickLZ,
377      ),
378      encryptor: None,
379    }
380  }
381
382  /// Create codec with custom max frame length
383  pub fn with_max_frame_length(max_frame_length: usize) -> Self {
384    Self {
385      max_frame_length,
386      compressor: Compressor::with_config(
387        true,
388        MOBILE_COMPRESS_THRESHOLD,
389        crate::compression::CompressionType::QuickLZ,
390      ),
391      encryptor: None,
392    }
393  }
394
395  /// Create codec with custom compressor
396  pub fn with_compressor(compressor: Compressor) -> Self {
397    Self {
398      max_frame_length: 10 * 1024 * 1024,
399      compressor,
400      encryptor: None,
401    }
402  }
403
404  /// Create codec with encryption enabled
405  pub fn with_encryption(key: &[u8]) -> Result<Self> {
406    let encryptor = DesEncryptor::try_new(key)?;
407    Ok(Self {
408      max_frame_length: 10 * 1024 * 1024,
409      compressor: Compressor::with_config(
410        true,
411        MOBILE_COMPRESS_THRESHOLD,
412        crate::compression::CompressionType::QuickLZ,
413      ),
414      encryptor: Some(encryptor),
415    })
416  }
417
418  /// Create fully configured codec
419  pub fn with_full_config(
420    max_frame_length: usize,
421    compressor: Compressor,
422    encryption_key: Option<&[u8]>,
423  ) -> Result<Self> {
424    let encryptor = match encryption_key {
425      | Some(key) => Some(DesEncryptor::try_new(key)?),
426      | None => None,
427    };
428    Ok(Self {
429      max_frame_length,
430      compressor,
431      encryptor,
432    })
433  }
434
435  /// Set encryption key
436  pub fn set_encryption_key(&mut self, key: Option<&[u8]>) -> Result<()> {
437    self.encryptor = match key {
438      | Some(k) => Some(DesEncryptor::try_new(k)?),
439      | None => None,
440    };
441    Ok(())
442  }
443
444  /// Check if encryption is enabled
445  pub fn is_encryption_enabled(&self) -> bool {
446    self.encryptor.is_some()
447  }
448
449  /// Peek at buffer to determine packet type
450  fn peek_packet_type(buf: &[u8]) -> Option<PacketType> {
451    if buf.is_empty() {
452      return None;
453    }
454    match buf[0] {
455      | MOBILE_MAGIC => Some(PacketType::Message),
456      | HEARTBEAT_MAGIC => Some(PacketType::Heartbeat),
457      | _ => None,
458    }
459  }
460}
461
462#[derive(Debug, Clone, Copy, PartialEq, Eq)]
463enum PacketType {
464  Message,
465  Heartbeat,
466}
467
468impl Default for MobileCodec {
469  fn default() -> Self {
470    Self::new()
471  }
472}
473
474impl Decoder for MobileCodec {
475  type Error = ZusError;
476  type Item = MobilePacket;
477
478  fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
479    if src.is_empty() {
480      return Ok(None);
481    }
482
483    // Determine packet type from first byte
484    let packet_type = match Self::peek_packet_type(src) {
485      | Some(pt) => pt,
486      | None => {
487        return Err(ZusError::InvalidMagic(src[0] as u16));
488      }
489    };
490
491    match packet_type {
492      | PacketType::Heartbeat => {
493        if src.len() < HEARTBEAT_SIZE {
494          return Ok(None);
495        }
496        let heartbeat = MobileHeartbeat::decode(src)?;
497        Ok(Some(MobilePacket::Heartbeat(heartbeat)))
498      }
499      | PacketType::Message => {
500        if src.len() < MOBILE_HEADER_SIZE {
501          return Ok(None);
502        }
503
504        // Peek header to get body length
505        let mut peek_buf = src.clone();
506        let header = MobileProtocolHeader::decode(&mut peek_buf)?;
507
508        let total_length = MOBILE_HEADER_SIZE + header.body_length as usize;
509        if src.len() < total_length {
510          src.reserve(total_length - src.len());
511          return Ok(None);
512        }
513
514        // Check max frame length
515        if total_length > self.max_frame_length {
516          return Err(ZusError::Protocol(format!(
517            "Mobile frame too large: {} > {}",
518            total_length, self.max_frame_length
519          )));
520        }
521
522        // Now decode the actual header
523        let header = MobileProtocolHeader::decode(src)?;
524
525        // Read body
526        let raw_body = src.split_to(header.body_length as usize).freeze();
527
528        // Verify CRC on raw body
529        if !header.verify_datacrc(&raw_body) {
530          return Err(ZusError::CrcMismatch);
531        }
532
533        // Decrypt if needed
534        let decrypted_body = if header.is_encrypted() {
535          match &self.encryptor {
536            | Some(enc) => Bytes::from(enc.decrypt(&raw_body)?),
537            | None => {
538              return Err(ZusError::Encryption(
539                "Received encrypted message but no encryption key configured".to_string(),
540              ));
541            }
542          }
543        } else {
544          raw_body
545        };
546
547        // Decode body based on message type
548        let (method, body) = if header.msg_type == msg_types::REQUEST || header.msg_type == msg_types::NOTIFY {
549          // Request/Notify: [method_len(4)][method][params_len(4)][params]
550          let mut body_buf = decrypted_body;
551
552          if body_buf.len() < 4 {
553            return Err(ZusError::Protocol("Invalid method length".to_string()));
554          }
555          let method_len = body_buf.get_u32() as usize;
556          if body_buf.len() < method_len {
557            return Err(ZusError::Protocol("Invalid method data".to_string()));
558          }
559          let method = body_buf.split_to(method_len);
560
561          if body_buf.len() < 4 {
562            return Err(ZusError::Protocol("Invalid params length".to_string()));
563          }
564          let params_len = body_buf.get_u32() as usize;
565          if body_buf.len() < params_len {
566            return Err(ZusError::Protocol("Invalid params data".to_string()));
567          }
568          let raw_params = body_buf.split_to(params_len);
569
570          // Decompress if needed
571          let params = if header.is_compressed() {
572            self.compressor.decompress(&raw_params)?
573          } else {
574            raw_params
575          };
576
577          (method, params)
578        } else {
579          // Response: decompress entire body
580          let body_data = if header.is_encrypted() {
581            // For encrypted responses, first 4 bytes are original length
582            let mut body_buf = decrypted_body;
583            if body_buf.len() < 4 {
584              return Err(ZusError::Protocol("Invalid encrypted response".to_string()));
585            }
586            let original_len = body_buf.get_u32() as usize;
587            if body_buf.len() < original_len {
588              return Err(ZusError::Protocol("Invalid encrypted response length".to_string()));
589            }
590            body_buf.split_to(original_len)
591          } else {
592            decrypted_body
593          };
594
595          let body = if header.is_compressed() {
596            self.compressor.decompress(&body_data)?
597          } else {
598            body_data
599          };
600          (Bytes::new(), body)
601        };
602
603        Ok(Some(MobilePacket::Message(MobileMessage { header, method, body })))
604      }
605    }
606  }
607}
608
609impl Encoder<MobilePacket> for MobileCodec {
610  type Error = ZusError;
611
612  fn encode(&mut self, item: MobilePacket, dst: &mut BytesMut) -> Result<()> {
613    match item {
614      | MobilePacket::Heartbeat(hb) => {
615        dst.reserve(HEARTBEAT_SIZE);
616        hb.encode(dst);
617        Ok(())
618      }
619      | MobilePacket::Message(mut msg) => {
620        let is_request = !msg.method.is_empty();
621        let mut full_body = BytesMut::new();
622        let was_compressed: bool;
623
624        if is_request {
625          // Request: compress only params
626          let (compressed_params, params_compressed) = self.compressor.compress(&msg.body)?;
627          was_compressed = params_compressed;
628
629          full_body.put_u32(msg.method.len() as u32);
630          full_body.put(msg.method);
631          full_body.put_u32(compressed_params.len() as u32);
632          full_body.put(compressed_params);
633        } else {
634          // Response: compress entire body
635          let (compressed_body, body_compressed) = self.compressor.compress(&msg.body)?;
636          was_compressed = body_compressed;
637          full_body.put(compressed_body);
638        }
639
640        // Encrypt if enabled
641        let final_body = if let Some(ref enc) = self.encryptor {
642          let data_to_encrypt = if is_request {
643            full_body.freeze()
644          } else {
645            let mut with_len = BytesMut::with_capacity(4 + full_body.len());
646            with_len.put_u32(full_body.len() as u32);
647            with_len.put(full_body);
648            with_len.freeze()
649          };
650          let encrypted = enc.encrypt(&data_to_encrypt)?;
651          msg.header.set_encrypted(true);
652          Bytes::from(encrypted)
653        } else {
654          msg.header.set_encrypted(false);
655          full_body.freeze()
656        };
657
658        // Update header
659        msg.header.body_length = final_body.len() as u32;
660        msg.header.set_compressed(was_compressed);
661        msg.header.datacrc = MobileProtocolHeader::calculate_datacrc(&final_body);
662
663        // Reserve and encode
664        let total_length = MOBILE_HEADER_SIZE + final_body.len();
665        dst.reserve(total_length);
666        msg.header.encode(dst);
667        dst.put(final_body);
668
669        Ok(())
670      }
671    }
672  }
673}
674
675/// Encoder for MobileMessage directly (convenience)
676impl Encoder<MobileMessage> for MobileCodec {
677  type Error = ZusError;
678
679  fn encode(&mut self, item: MobileMessage, dst: &mut BytesMut) -> Result<()> {
680    <Self as Encoder<MobilePacket>>::encode(self, MobilePacket::Message(item), dst)
681  }
682}
683
684/// Encoder for MobileHeartbeat directly (convenience)
685impl Encoder<MobileHeartbeat> for MobileCodec {
686  type Error = ZusError;
687
688  fn encode(&mut self, item: MobileHeartbeat, dst: &mut BytesMut) -> Result<()> {
689    <Self as Encoder<MobilePacket>>::encode(self, MobilePacket::Heartbeat(item), dst)
690  }
691}
692
693#[cfg(test)]
694mod tests {
695  use super::*;
696
697  #[test]
698  fn test_mobile_header_encode_decode() {
699    let mut header = MobileProtocolHeader::new(123, msg_types::REQUEST);
700    header.body_length = 100;
701    header.datacrc = 0x1234;
702
703    let mut buf = BytesMut::new();
704    header.encode(&mut buf);
705
706    assert_eq!(buf.len(), MOBILE_HEADER_SIZE);
707
708    let decoded = MobileProtocolHeader::decode(&mut buf).unwrap();
709    assert_eq!(decoded.magic, MOBILE_MAGIC);
710    assert_eq!(decoded.seq_id, 123);
711    assert_eq!(decoded.msg_type, msg_types::REQUEST);
712    assert_eq!(decoded.body_length, 100);
713    assert_eq!(decoded.datacrc, 0x1234);
714  }
715
716  #[test]
717  fn test_heartbeat_encode_decode() {
718    let heartbeat = MobileHeartbeat::new();
719
720    let mut buf = BytesMut::new();
721    heartbeat.encode(&mut buf);
722
723    assert_eq!(buf.len(), HEARTBEAT_SIZE);
724    assert_eq!(buf[0], HEARTBEAT_MAGIC);
725
726    let decoded = MobileHeartbeat::decode(&mut buf).unwrap();
727    assert_eq!(decoded.magic, HEARTBEAT_MAGIC);
728  }
729
730  #[test]
731  fn test_mobile_codec_message_roundtrip() {
732    let mut codec = MobileCodec::new();
733
734    let method = Bytes::from("test.method");
735    let body = Bytes::from("hello mobile");
736    let msg = MobileMessage::new_request(42, method.clone(), body.clone());
737
738    let mut buf = BytesMut::new();
739    codec.encode(MobilePacket::Message(msg), &mut buf).unwrap();
740
741    let decoded = codec.decode(&mut buf).unwrap().unwrap();
742    match decoded {
743      | MobilePacket::Message(msg) => {
744        assert_eq!(msg.header.seq_id, 42);
745        assert_eq!(msg.method, method);
746        assert_eq!(msg.body, body);
747      }
748      | _ => panic!("Expected message, got heartbeat"),
749    }
750  }
751
752  #[test]
753  fn test_mobile_codec_heartbeat_roundtrip() {
754    let mut codec = MobileCodec::new();
755
756    let heartbeat = MobileHeartbeat::new();
757    let mut buf = BytesMut::new();
758    codec.encode(MobilePacket::Heartbeat(heartbeat), &mut buf).unwrap();
759
760    let decoded = codec.decode(&mut buf).unwrap().unwrap();
761    match decoded {
762      | MobilePacket::Heartbeat(hb) => {
763        assert_eq!(hb.magic, HEARTBEAT_MAGIC);
764      }
765      | _ => panic!("Expected heartbeat, got message"),
766    }
767  }
768
769  #[test]
770  fn test_mobile_codec_with_encryption() {
771    let key = b"12345678";
772    let mut codec = MobileCodec::with_encryption(key).unwrap();
773
774    let method = Bytes::from("encrypted.call");
775    let body = Bytes::from("secret data");
776    let msg = MobileMessage::new_request(1, method.clone(), body.clone());
777
778    let mut buf = BytesMut::new();
779    codec.encode(MobilePacket::Message(msg), &mut buf).unwrap();
780
781    // Verify encrypted flag is set
782    assert_eq!(buf[3] & flags::ENCRYPTED, flags::ENCRYPTED);
783
784    let decoded = codec.decode(&mut buf).unwrap().unwrap();
785    match decoded {
786      | MobilePacket::Message(msg) => {
787        assert_eq!(msg.method, method);
788        assert_eq!(msg.body, body);
789      }
790      | _ => panic!("Expected message"),
791    }
792  }
793
794  #[test]
795  fn test_mobile_codec_response() {
796    let mut codec = MobileCodec::new();
797
798    let body = Bytes::from("response data");
799    let msg = MobileMessage::new_response(99, body.clone());
800
801    let mut buf = BytesMut::new();
802    codec.encode(MobilePacket::Message(msg), &mut buf).unwrap();
803
804    let decoded = codec.decode(&mut buf).unwrap().unwrap();
805    match decoded {
806      | MobilePacket::Message(msg) => {
807        assert_eq!(msg.header.seq_id, 99);
808        assert_eq!(msg.header.msg_type, msg_types::RESPONSE);
809        assert!(msg.method.is_empty());
810        assert_eq!(msg.body, body);
811      }
812      | _ => panic!("Expected message"),
813    }
814  }
815
816  #[test]
817  fn test_mobile_flags() {
818    let mut header = MobileProtocolHeader::new(1, msg_types::REQUEST);
819
820    assert!(!header.is_compressed());
821    assert!(!header.is_encrypted());
822    assert!(!header.is_fragment());
823
824    header.set_compressed(true);
825    assert!(header.is_compressed());
826
827    header.set_encrypted(true);
828    assert!(header.is_encrypted());
829
830    header.set_fragment(true);
831    assert!(header.is_fragment());
832
833    assert_eq!(header.flags, 0x07);
834  }
835
836  #[test]
837  fn test_invalid_magic() {
838    let mut buf = BytesMut::new();
839    buf.put_u8(0xFF); // Invalid magic
840    buf.put_slice(&[0u8; 15]);
841
842    let result = MobileProtocolHeader::decode(&mut buf);
843    assert!(result.is_err());
844  }
845
846  #[test]
847  fn test_is_heartbeat() {
848    let heartbeat_buf = [HEARTBEAT_MAGIC, 0, 0, 0, 0, 0, 0, 0];
849    let message_buf = [MOBILE_MAGIC, 0, 0, 0, 0, 0, 0, 0];
850
851    assert!(MobileHeartbeat::is_heartbeat(&heartbeat_buf));
852    assert!(!MobileHeartbeat::is_heartbeat(&message_buf));
853    assert!(!MobileHeartbeat::is_heartbeat(&[]));
854  }
855
856  #[test]
857  fn test_compression_threshold() {
858    let codec = MobileCodec::new();
859    // Mobile uses 256-byte threshold
860    assert_eq!(codec.compressor.threshold_bytes, MOBILE_COMPRESS_THRESHOLD);
861  }
862}