use std::fmt;
#[cfg_attr(doc, aquamarine::aquamarine)]
#[derive(Debug)]
pub struct S7CommPacket<'a> {
pub tpkt: TpktHeader,
pub cotp: CotpHeader,
pub s7_header: S7Header,
pub parameter: S7Parameter<'a>,
pub payload: Option<&'a [u8]>,
}
#[derive(Debug)]
pub struct TpktHeader {
pub version: u8,
pub reserved: u8,
pub length: u16,
}
#[derive(Debug)]
pub struct CotpHeader {
pub length: u8,
pub pdu_type: u8,
pub destination_reference: u16,
pub source_reference: u16,
pub last_data_unit: bool,
}
#[derive(Debug)]
pub struct S7Header {
pub protocol_id: u8,
pub rosctr: u8,
pub reserved: u16,
pub pduref: u16,
pub parameter_length: u16,
pub data_length: u16,
pub error_class: Option<u8>,
pub error_code: Option<u8>,
}
#[derive(Debug)]
pub struct S7Parameter<'a> {
pub function: u8,
pub items: Vec<S7ParameterItem<'a>>,
}
#[derive(Debug)]
pub struct S7ParameterItem<'a> {
pub spec_type: u8,
pub length: u8,
pub syntax_id: u8,
pub transport_size: u8,
pub db_number: u16,
pub area: u8,
pub address: u32,
pub raw: Option<&'a [u8]>,
}
impl<'a> S7CommPacket<'a> {
pub const MIN_SIZES: usize = 4 + 3 + 10;
pub fn try_from(packet: &'a [u8]) -> Result<Self, &'static str> {
if packet.len() < Self::MIN_SIZES {
return Err("Packet too short for S7Comm header");
}
if packet[0] != 0x03 {
return Err("Invalid TPKT version, expected 0x03");
}
let tpkt = TpktHeader {
version: packet[0],
reserved: packet[1],
length: u16::from_be_bytes([packet[2], packet[3]]),
};
let cotp_len = packet[4] as usize;
if 4 + cotp_len + 1 > packet.len() {
return Err("Invalid COTP header length");
}
let cotp = CotpHeader {
length: packet[4],
pdu_type: packet[5],
destination_reference: u16::from_be_bytes([packet[6], packet[7]]),
source_reference: u16::from_be_bytes([packet[8], packet[9]]),
last_data_unit: (packet[10] & 0x80) != 0,
};
let s7_start = 4 + cotp.length as usize + 1;
if s7_start + 10 > packet.len() {
return Err("Packet too short for S7 header");
}
let mut s7_header = S7Header {
protocol_id: packet[s7_start],
rosctr: packet[s7_start + 1],
reserved: u16::from_be_bytes([packet[s7_start + 2], packet[s7_start + 3]]),
pduref: u16::from_be_bytes([packet[s7_start + 4], packet[s7_start + 5]]),
parameter_length: u16::from_be_bytes([packet[s7_start + 6], packet[s7_start + 7]]),
data_length: u16::from_be_bytes([packet[s7_start + 8], packet[s7_start + 9]]),
error_class: None,
error_code: None,
};
if s7_header.rosctr == 0x03 && s7_start + 11 < packet.len() {
s7_header.error_class = Some(packet[s7_start + 10]);
s7_header.error_code = Some(packet[s7_start + 11]);
}
let s7_header_end = std::cmp::min(s7_start + 12, packet.len());
for _byte in packet.iter().take(s7_header_end).skip(s7_start) {
}
let s7_header_length = if s7_header.rosctr == 0x03 { 12 } else { 10 };
let param_start = s7_start + s7_header_length;
let parameter = if s7_header.parameter_length > 0 {
let param_end = param_start + s7_header.parameter_length as usize;
if param_end > packet.len() {
return Err("Invalid parameter length");
}
Self::parse_parameter(&packet[param_start..param_end])?
} else {
S7Parameter {
function: 0,
items: Vec::new(),
}
};
let payload = if s7_header.data_length > 0 {
let data_start = param_start + s7_header.parameter_length as usize;
let data_end = data_start + s7_header.data_length as usize;
if data_end > packet.len() {
return Err("Invalid data length");
}
Some(&packet[data_start..data_end])
} else {
None
};
Ok(S7CommPacket {
tpkt,
cotp,
s7_header,
parameter,
payload,
})
}
fn parse_parameter(data: &'a [u8]) -> Result<S7Parameter<'a>, &'static str> {
if data.is_empty() {
return Err("Empty parameter data");
}
if data.len() == 1 {
return Ok(S7Parameter {
function: data[0],
items: Vec::new(),
});
}
let function = data[0];
let item_count = data[1] as usize;
let mut items = Vec::with_capacity(item_count);
let mut offset = 2;
for _ in 0..item_count {
if offset + 2 > data.len() {
return Err("Invalid parameter item header");
}
let spec_type = data[offset];
let length = data[offset + 1] as usize;
if offset + 2 + length > data.len() {
return Err("Invalid parameter item length");
}
if spec_type == 0x12 && length >= 0x0A {
if offset + 12 > data.len() {
return Err("S7ANY parameter too short");
}
let syntax_id = data[offset + 2];
let transport_size = data[offset + 3];
let db_number = u16::from_be_bytes([data[offset + 5], data[offset + 6]]);
let area = data[offset + 7];
let address = ((data[offset + 8] as u32) << 16)
| ((data[offset + 9] as u32) << 8)
| (data[offset + 10] as u32);
items.push(S7ParameterItem {
spec_type,
length: length as u8,
syntax_id,
transport_size,
db_number,
area,
address,
raw: Some(&data[offset..offset + 2 + length]),
});
} else {
items.push(S7ParameterItem {
spec_type,
length: length as u8,
syntax_id: 0,
transport_size: 0,
db_number: 0,
area: 0,
address: 0,
raw: Some(&data[offset..offset + 2 + length]),
});
}
offset += 2 + length;
}
Ok(S7Parameter { function, items })
}
}
impl<'a> fmt::Display for S7CommPacket<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "S7Comm Protocol ")
}
}
#[cfg(test)]
mod tests {
use super::*;
use hex;
#[test]
fn test_s7comm_try_from() {
let hex_str = "0300001f02f080320100000013000e00000401120a10020001000083000000";
let bytes = hex::decode(hex_str).expect("Failed to decode hex string");
let result = S7CommPacket::try_from(&bytes[..]);
assert!(
result.is_ok(),
"Failed to parse S7Comm packet: {:?}",
result.err().unwrap()
);
}
#[test]
fn test_s7comm_parse() {
let hex_str = "0300003102f080320100000e00002000001a00010000000000095f30413030303031500d31303030353030303030343030";
let bytes = hex::decode(hex_str).expect("Failed to decode hex string");
let result = S7CommPacket::try_from(&bytes[..]);
assert!(
result.is_ok(),
"Failed to parse S7Comm packet: {:?}",
result.err().unwrap()
);
}
#[test]
fn test_parameter_request_download() {
let hex_str = "0300001402f080320300000e000001000000001a";
let bytes = hex::decode(hex_str).expect("Failed to decode hex string");
let result = S7CommPacket::try_from(&bytes[..]);
assert!(
result.is_ok(),
"Failed to parse S7Comm packet: {:?}",
result.err().unwrap()
);
}
}