use crate::error::{Error, Result};
use crate::types::{
CMPP_ACTIVE_TEST, CMPP_ACTIVE_TEST_RESP, CMPP_CONNECT, CMPP_CONNECT_RESP, CMPP_DELIVER,
CMPP_DELIVER_RESP, CMPP_HEADER_LENGTH, CMPP_SUBMIT, CMPP_SUBMIT_RESP, CMPP_TERMINATE,
CMPP_TERMINATE_RESP, CmppHeader,
};
use bytes::{BufMut, Bytes, BytesMut};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Frame {
pub sequence_id: u32,
pub pdu: Pdu,
}
impl Frame {
pub fn new(sequence_id: u32, pdu: Pdu) -> Self {
Frame { sequence_id, pdu }
}
pub fn encode(&self) -> Bytes {
self.pdu.encode(self.sequence_id)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Pdu {
Connect(Connect),
ConnectResp(ConnectResp),
Submit(Box<Submit>),
SubmitResp(SubmitResp),
Deliver(Deliver),
DeliverResp(DeliverResp),
ActiveTest,
ActiveTestResp,
Terminate,
TerminateResp,
}
impl Pdu {
pub fn command_id(&self) -> u32 {
match self {
Pdu::Connect(_) => CMPP_CONNECT,
Pdu::ConnectResp(_) => CMPP_CONNECT_RESP,
Pdu::Submit(_) => CMPP_SUBMIT,
Pdu::SubmitResp(_) => CMPP_SUBMIT_RESP,
Pdu::Deliver(_) => CMPP_DELIVER,
Pdu::DeliverResp(_) => CMPP_DELIVER_RESP,
Pdu::ActiveTest => CMPP_ACTIVE_TEST,
Pdu::ActiveTestResp => CMPP_ACTIVE_TEST_RESP,
Pdu::Terminate => CMPP_TERMINATE,
Pdu::TerminateResp => CMPP_TERMINATE_RESP,
}
}
pub fn encode(&self, sequence_id: u32) -> Bytes {
let mut body = BytesMut::new();
match self {
Pdu::Connect(p) => p.encode_body(&mut body),
Pdu::ConnectResp(p) => p.encode_body(&mut body),
Pdu::Submit(p) => p.encode_body(&mut body),
Pdu::SubmitResp(p) => p.encode_body(&mut body),
Pdu::Deliver(p) => p.encode_body(&mut body),
Pdu::DeliverResp(p) => p.encode_body(&mut body),
Pdu::ActiveTest | Pdu::Terminate | Pdu::TerminateResp => {}
Pdu::ActiveTestResp => body.put_u8(0),
}
frame(self.command_id(), sequence_id, &body)
}
pub fn decode(header: CmppHeader, body: &[u8]) -> Result<Pdu> {
Ok(match header.command_id {
CMPP_CONNECT => Pdu::Connect(Connect::decode(body)?),
CMPP_CONNECT_RESP => Pdu::ConnectResp(ConnectResp::decode(body)?),
CMPP_SUBMIT => Pdu::Submit(Box::new(Submit::decode(body)?)),
CMPP_SUBMIT_RESP => Pdu::SubmitResp(SubmitResp::decode(body)?),
CMPP_DELIVER => Pdu::Deliver(Deliver::decode(body)?),
CMPP_DELIVER_RESP => Pdu::DeliverResp(DeliverResp::decode(body)?),
CMPP_ACTIVE_TEST => Pdu::ActiveTest,
CMPP_ACTIVE_TEST_RESP => Pdu::ActiveTestResp,
CMPP_TERMINATE => Pdu::Terminate,
CMPP_TERMINATE_RESP => Pdu::TerminateResp,
other => return Err(Error::Decode(format!("未知 command id {:#010x}", other))),
})
}
}
fn frame(command_id: u32, sequence_id: u32, body: &[u8]) -> Bytes {
let total_length = (CMPP_HEADER_LENGTH + body.len()) as u32;
let mut out = BytesMut::with_capacity(total_length as usize);
out.put_u32(total_length);
out.put_u32(command_id);
out.put_u32(sequence_id);
out.put_slice(body);
out.freeze()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Connect {
pub source_addr: String,
pub authenticator_source: [u8; 16],
pub version: u8,
pub timestamp: u32,
}
impl Connect {
pub fn new(source_addr: &str, shared_secret: &str, version: u8) -> Connect {
use chrono::{Datelike, Timelike, Utc};
let now = Utc::now();
let timestamp = now.month() * 100_000_000
+ now.day() * 1_000_000
+ now.hour() * 10_000
+ now.minute() * 100
+ now.second();
let authenticator_source =
compute_authenticator_source(source_addr, shared_secret, timestamp);
Connect {
source_addr: source_addr.to_string(),
authenticator_source,
version,
timestamp,
}
}
fn encode_body(&self, buf: &mut BytesMut) {
put_octet_str(buf, &self.source_addr, 6);
buf.put_slice(&self.authenticator_source);
buf.put_u8(self.version);
buf.put_u32(self.timestamp);
}
fn decode(body: &[u8]) -> Result<Connect> {
let mut r = BodyReader::new(body);
let source_addr = read_octet_str(r.take(6)?);
let authenticator_source: [u8; 16] = r.take(16)?.try_into().unwrap();
let version = r.u8()?;
let timestamp = r.u32()?;
Ok(Connect {
source_addr,
authenticator_source,
version,
timestamp,
})
}
}
pub fn compute_authenticator_source(
source_addr: &str,
shared_secret: &str,
timestamp: u32,
) -> [u8; 16] {
use md5::{Digest, Md5};
let mut hasher = Md5::new();
hasher.update(source_addr.as_bytes());
hasher.update([0u8; 9]);
hasher.update(shared_secret.as_bytes());
hasher.update(format!("{:010}", timestamp).as_bytes());
let result = hasher.finalize();
let mut out = [0u8; 16];
out.copy_from_slice(&result[..]);
out
}
pub fn compute_authenticator_ismg(
status: u8,
authenticator_source: &[u8; 16],
shared_secret: &str,
) -> [u8; 16] {
use md5::{Digest, Md5};
let mut hasher = Md5::new();
hasher.update([status]);
hasher.update(authenticator_source);
hasher.update(shared_secret.as_bytes());
let result = hasher.finalize();
let mut out = [0u8; 16];
out.copy_from_slice(&result[..]);
out
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConnectResp {
pub status: u8,
pub authenticator_ismg: [u8; 16],
pub version: u8,
}
impl ConnectResp {
fn encode_body(&self, buf: &mut BytesMut) {
buf.put_u8(self.status);
buf.put_slice(&self.authenticator_ismg);
buf.put_u8(self.version);
}
fn decode(body: &[u8]) -> Result<ConnectResp> {
let mut r = BodyReader::new(body);
let status = r.u8()?;
let authenticator_ismg: [u8; 16] = r.take(16)?.try_into().unwrap();
let version = r.u8()?;
Ok(ConnectResp {
status,
authenticator_ismg,
version,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Submit {
pub msg_id: [u8; 8],
pub pk_total: u8,
pub pk_number: u8,
pub registered_delivery: u8,
pub msg_level: u8,
pub service_id: String,
pub fee_user_type: u8,
pub fee_terminal_id: String,
pub tp_pid: u8,
pub tp_udhi: u8,
pub msg_fmt: u8,
pub msg_src: String,
pub fee_type: String,
pub fee_code: String,
pub valid_time: String,
pub at_time: String,
pub src_id: String,
pub dest_terminal_ids: Vec<String>,
pub msg_content: Vec<u8>,
}
impl Submit {
fn encode_body(&self, buf: &mut BytesMut) {
buf.put_slice(&self.msg_id);
buf.put_u8(self.pk_total);
buf.put_u8(self.pk_number);
buf.put_u8(self.registered_delivery);
buf.put_u8(self.msg_level);
put_octet_str(buf, &self.service_id, 10);
buf.put_u8(self.fee_user_type);
put_octet_str(buf, &self.fee_terminal_id, 21);
buf.put_u8(self.tp_pid);
buf.put_u8(self.tp_udhi);
buf.put_u8(self.msg_fmt);
put_octet_str(buf, &self.msg_src, 6);
put_octet_str(buf, &self.fee_type, 2);
put_octet_str(buf, &self.fee_code, 6);
put_octet_str(buf, &self.valid_time, 17);
put_octet_str(buf, &self.at_time, 17);
put_octet_str(buf, &self.src_id, 21);
buf.put_u8(self.dest_terminal_ids.len() as u8);
for d in &self.dest_terminal_ids {
put_octet_str(buf, d, 21);
}
buf.put_u8(self.msg_content.len() as u8);
buf.put_slice(&self.msg_content);
buf.put_slice(&[0u8; 8]); }
fn decode(body: &[u8]) -> Result<Submit> {
let mut r = BodyReader::new(body);
let msg_id: [u8; 8] = r.take(8)?.try_into().unwrap();
let pk_total = r.u8()?;
let pk_number = r.u8()?;
let registered_delivery = r.u8()?;
let msg_level = r.u8()?;
let service_id = read_octet_str(r.take(10)?);
let fee_user_type = r.u8()?;
let fee_terminal_id = read_octet_str(r.take(21)?);
let tp_pid = r.u8()?;
let tp_udhi = r.u8()?;
let msg_fmt = r.u8()?;
let msg_src = read_octet_str(r.take(6)?);
let fee_type = read_octet_str(r.take(2)?);
let fee_code = read_octet_str(r.take(6)?);
let valid_time = read_octet_str(r.take(17)?);
let at_time = read_octet_str(r.take(17)?);
let src_id = read_octet_str(r.take(21)?);
let dest_count = r.u8()? as usize;
let mut dest_terminal_ids = Vec::with_capacity(dest_count);
for _ in 0..dest_count {
dest_terminal_ids.push(read_octet_str(r.take(21)?));
}
let msg_length = r.u8()? as usize;
let msg_content = r.take(msg_length)?.to_vec();
Ok(Submit {
msg_id,
pk_total,
pk_number,
registered_delivery,
msg_level,
service_id,
fee_user_type,
fee_terminal_id,
tp_pid,
tp_udhi,
msg_fmt,
msg_src,
fee_type,
fee_code,
valid_time,
at_time,
src_id,
dest_terminal_ids,
msg_content,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubmitResp {
pub msg_id: [u8; 8],
pub result: u8,
}
impl SubmitResp {
fn encode_body(&self, buf: &mut BytesMut) {
buf.put_slice(&self.msg_id);
buf.put_u8(self.result);
}
fn decode(body: &[u8]) -> Result<SubmitResp> {
let mut r = BodyReader::new(body);
let msg_id: [u8; 8] = r.take(8)?.try_into().unwrap();
let result = r.u8()?;
Ok(SubmitResp { msg_id, result })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Deliver {
pub msg_id: [u8; 8],
pub dest_id: String,
pub service_id: String,
pub tp_pid: u8,
pub tp_udhi: u8,
pub msg_fmt: u8,
pub src_terminal_id: String,
pub registered_delivery: u8,
pub msg_content: Vec<u8>,
}
impl Deliver {
pub fn report(&self) -> Option<DeliverReport> {
if self.registered_delivery == 1 {
DeliverReport::parse(&self.msg_content)
} else {
None
}
}
fn encode_body(&self, buf: &mut BytesMut) {
buf.put_slice(&self.msg_id);
put_octet_str(buf, &self.dest_id, 21);
put_octet_str(buf, &self.service_id, 10);
buf.put_u8(self.tp_pid);
buf.put_u8(self.tp_udhi);
buf.put_u8(self.msg_fmt);
put_octet_str(buf, &self.src_terminal_id, 21);
buf.put_u8(self.registered_delivery);
buf.put_u8(self.msg_content.len() as u8);
buf.put_slice(&self.msg_content);
buf.put_slice(&[0u8; 8]); }
fn decode(body: &[u8]) -> Result<Deliver> {
let mut r = BodyReader::new(body);
let msg_id: [u8; 8] = r.take(8)?.try_into().unwrap();
let dest_id = read_octet_str(r.take(21)?);
let service_id = read_octet_str(r.take(10)?);
let tp_pid = r.u8()?;
let tp_udhi = r.u8()?;
let msg_fmt = r.u8()?;
let src_terminal_id = read_octet_str(r.take(21)?);
let registered_delivery = r.u8()?;
let msg_length = r.u8()? as usize;
let msg_content = r.take(msg_length)?.to_vec();
Ok(Deliver {
msg_id,
dest_id,
service_id,
tp_pid,
tp_udhi,
msg_fmt,
src_terminal_id,
registered_delivery,
msg_content,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeliverResp {
pub msg_id: [u8; 8],
pub result: u8,
}
impl DeliverResp {
fn encode_body(&self, buf: &mut BytesMut) {
buf.put_slice(&self.msg_id);
buf.put_u8(self.result);
}
fn decode(body: &[u8]) -> Result<DeliverResp> {
let mut r = BodyReader::new(body);
let msg_id: [u8; 8] = r.take(8)?.try_into().unwrap();
let result = r.u8()?;
Ok(DeliverResp { msg_id, result })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeliverReport {
pub msg_id: [u8; 8],
pub stat: String,
pub submit_time: String,
pub done_time: String,
pub dest_terminal_id: String,
pub smsc_sequence: u32,
}
impl DeliverReport {
pub fn msg_id_hex(&self) -> String {
self.msg_id.iter().map(|b| format!("{:02x}", b)).collect()
}
pub fn parse(content: &[u8]) -> Option<DeliverReport> {
if content.len() < 60 {
return None;
}
let mut msg_id = [0u8; 8];
msg_id.copy_from_slice(&content[0..8]);
let stat = read_octet_str(&content[8..15]);
let submit_time = read_octet_str(&content[15..25]);
let done_time = read_octet_str(&content[25..35]);
let dest_terminal_id = read_octet_str(&content[35..56]);
let smsc_sequence =
u32::from_be_bytes([content[56], content[57], content[58], content[59]]);
Some(DeliverReport {
msg_id,
stat,
submit_time,
done_time,
dest_terminal_id,
smsc_sequence,
})
}
}
fn put_octet_str(buf: &mut BytesMut, s: &str, len: usize) {
let bytes = s.as_bytes();
let n = bytes.len().min(len);
buf.put_slice(&bytes[..n]);
for _ in n..len {
buf.put_u8(0);
}
}
fn read_octet_str(bytes: &[u8]) -> String {
let end = bytes
.iter()
.rposition(|&b| b != 0 && b != b' ')
.map(|i| i + 1)
.unwrap_or(0);
String::from_utf8_lossy(&bytes[..end]).into_owned()
}
struct BodyReader<'a> {
buf: &'a [u8],
pos: usize,
}
impl<'a> BodyReader<'a> {
fn new(buf: &'a [u8]) -> Self {
BodyReader { buf, pos: 0 }
}
fn take(&mut self, n: usize) -> Result<&'a [u8]> {
let buf: &'a [u8] = self.buf;
let start = self.pos;
let end = start + n;
if end > buf.len() {
return Err(Error::Decode(format!(
"body 意外结束: 需要 {} bytes(offset {}),实际有 {}",
n,
start,
buf.len()
)));
}
self.pos = end;
Ok(&buf[start..end])
}
fn u8(&mut self) -> Result<u8> {
Ok(self.take(1)?[0])
}
fn u32(&mut self) -> Result<u32> {
let b = self.take(4)?;
Ok(u32::from_be_bytes([b[0], b[1], b[2], b[3]]))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn round_trip(pdu: Pdu, seq: u32) -> Pdu {
let bytes = pdu.encode(seq);
let total = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
assert_eq!(total, bytes.len());
let header = CmppHeader {
total_length: total as u32,
command_id: u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
sequence_id: u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]),
};
assert_eq!(header.sequence_id, seq);
Pdu::decode(header, &bytes[CMPP_HEADER_LENGTH..]).unwrap()
}
#[test]
fn connect_round_trip() {
let c = Connect::new("901234", "secret", 0x20);
let decoded = round_trip(Pdu::Connect(c.clone()), 1);
assert_eq!(decoded, Pdu::Connect(c));
}
#[test]
fn connect_resp_round_trip() {
let r = ConnectResp {
status: 0,
authenticator_ismg: [7u8; 16],
version: 0x20,
};
assert_eq!(
round_trip(Pdu::ConnectResp(r.clone()), 2),
Pdu::ConnectResp(r)
);
}
#[test]
fn submit_round_trip_multi_dest() {
let s = Submit {
msg_id: [0u8; 8],
pk_total: 1,
pk_number: 1,
registered_delivery: 1,
msg_level: 0,
service_id: "SVC".into(),
fee_user_type: 2,
fee_terminal_id: String::new(),
tp_pid: 0,
tp_udhi: 0,
msg_fmt: 8,
msg_src: "901234".into(),
fee_type: "01".into(),
fee_code: "000000".into(),
valid_time: String::new(),
at_time: String::new(),
src_id: "10690001".into(),
dest_terminal_ids: vec!["13800138000".into(), "13800138001".into()],
msg_content: vec![0x4f, 0x60],
};
assert_eq!(
round_trip(Pdu::Submit(Box::new(s.clone())), 3),
Pdu::Submit(Box::new(s))
);
}
#[test]
fn submit_resp_round_trip() {
let r = SubmitResp {
msg_id: [1, 2, 3, 4, 5, 6, 7, 8],
result: 0,
};
assert_eq!(
round_trip(Pdu::SubmitResp(r.clone()), 4),
Pdu::SubmitResp(r)
);
}
#[test]
fn deliver_report_parse() {
let mut content = Vec::new();
content.extend_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]); content.extend_from_slice(b"DELIVRD"); content.extend_from_slice(b"2406061200"); content.extend_from_slice(b"2406061201"); let mut dest = b"13800138000".to_vec();
dest.resize(21, 0);
content.extend_from_slice(&dest); content.extend_from_slice(&42u32.to_be_bytes());
let d = Deliver {
msg_id: [0u8; 8],
dest_id: "10690001".into(),
service_id: "SVC".into(),
tp_pid: 0,
tp_udhi: 0,
msg_fmt: 0,
src_terminal_id: "13800138000".into(),
registered_delivery: 1,
msg_content: content,
};
let report = d.report().expect("应能解析 report");
assert_eq!(report.msg_id, [1, 2, 3, 4, 5, 6, 7, 8]);
assert_eq!(report.stat, "DELIVRD");
assert_eq!(report.dest_terminal_id, "13800138000");
assert_eq!(report.smsc_sequence, 42);
let rt = round_trip(Pdu::Deliver(d.clone()), 5);
assert_eq!(rt, Pdu::Deliver(d));
}
#[test]
fn empty_body_pdus_round_trip() {
assert_eq!(round_trip(Pdu::ActiveTest, 6), Pdu::ActiveTest);
assert_eq!(round_trip(Pdu::ActiveTestResp, 7), Pdu::ActiveTestResp);
assert_eq!(round_trip(Pdu::Terminate, 8), Pdu::Terminate);
assert_eq!(round_trip(Pdu::TerminateResp, 9), Pdu::TerminateResp);
}
#[test]
fn authenticator_ismg_matches_known_formula() {
let src = compute_authenticator_source("901234", "secret", 123456789);
let ismg = compute_authenticator_ismg(0, &src, "secret");
let ismg2 = compute_authenticator_ismg(0, &src, "secret");
assert_eq!(ismg, ismg2);
}
#[test]
fn decode_rejects_short_body() {
let err = SubmitResp::decode(&[0u8; 3]);
assert!(err.is_err());
}
}