use std::fmt;
use crate::{
checks::application::copt::{
validate_connection_header_len, validate_declared_len, validate_min_len,
validate_parameter_header, validate_parameter_len, validate_tpdu_number_not_empty,
},
errors::application::copt::CotpParseError,
};
#[derive(Debug, Clone, Copy, PartialEq)]
#[repr(u8)]
pub enum CotpPduType {
Data = 0xF0,
ConnectionRequest = 0xE0,
ConnectionConfirm = 0xD0,
DisconnectRequest = 0x80,
DisconnectConfirm = 0xC0,
TpduError = 0x70,
Other(u8),
}
impl From<u8> for CotpPduType {
fn from(value: u8) -> Self {
match value {
0xF0 => CotpPduType::Data,
0xE0 => CotpPduType::ConnectionRequest,
0xD0 => CotpPduType::ConnectionConfirm,
0x80 => CotpPduType::DisconnectRequest,
0xC0 => CotpPduType::DisconnectConfirm,
0x70 => CotpPduType::TpduError,
x => CotpPduType::Other(x),
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
#[derive(Debug, Clone)]
pub struct CotpHeader {
pub length: u8,
pub pdu_type: CotpPduType,
pub dst_ref: u16,
pub src_ref: u16,
pub class: u8,
pub extended_formats: bool,
pub no_explicit_flow_control: bool,
pub parameters: Vec<CotpParameter>,
}
#[derive(Debug, Clone)]
pub enum CotpParameter {
TpduSize(u8), SrcTsap(u16), DstTsap(u16), TpduNumber(u8), Eot(bool), Other(u8, Vec<u8>), }
impl CotpHeader {
pub const MIN_SIZE: usize = 7;
pub fn from_bytes(data: &[u8]) -> Result<(Self, usize), CotpParseError> {
validate_min_len(data)?;
let length = data[0];
let pdu_type = CotpPduType::from(data[1]);
let declared_end = length as usize + 1;
validate_declared_len(data.len(), declared_end)?;
let mut offset = 2; let (dst_ref, src_ref, class, extended_formats, no_explicit_flow_control) = match pdu_type {
CotpPduType::ConnectionRequest
| CotpPduType::ConnectionConfirm
| CotpPduType::DisconnectRequest
| CotpPduType::DisconnectConfirm
| CotpPduType::TpduError => {
let expected = offset + 5;
validate_connection_header_len(declared_end, expected)?;
let dst_ref = u16::from_be_bytes([data[offset], data[offset + 1]]);
let src_ref = u16::from_be_bytes([data[offset + 2], data[offset + 3]]);
let class = data[offset + 4] >> 4;
let extended_formats = (data[offset + 4] & 0x04) != 0;
let no_explicit_flow_control = (data[offset + 4] & 0x01) != 0;
offset += 5;
(
dst_ref,
src_ref,
class,
extended_formats,
no_explicit_flow_control,
)
}
_ => (0, 0, 0, false, false),
};
let mut parameters = Vec::new();
while offset < declared_end {
validate_parameter_header(declared_end, offset)?;
let param_type = data[offset];
let param_len = data[offset + 1] as usize;
validate_parameter_len(declared_end, offset, param_len)?;
let param_data = &data[offset + 2..offset + 2 + param_len];
let param = match param_type {
0xC0 => {
if pdu_type == CotpPduType::Data {
validate_tpdu_number_not_empty(offset, param_data.len())?;
CotpParameter::TpduNumber(param_data[0])
} else if param_len == 1 {
CotpParameter::TpduSize(param_data[0])
} else {
CotpParameter::Other(param_type, param_data.to_vec())
}
}
0xC1 if param_len == 2 => {
CotpParameter::SrcTsap(u16::from_be_bytes([param_data[0], param_data[1]]))
}
0xC2 if param_len == 2 => {
CotpParameter::DstTsap(u16::from_be_bytes([param_data[0], param_data[1]]))
}
0x80 if pdu_type == CotpPduType::Data && param_len == 0 => {
CotpParameter::Eot(true)
}
_ => CotpParameter::Other(param_type, param_data.to_vec()),
};
parameters.push(param);
offset += 2 + param_len;
}
Ok((
Self {
length,
pdu_type,
dst_ref,
src_ref,
class,
extended_formats,
no_explicit_flow_control,
parameters,
},
offset,
))
}
}
impl TryFrom<&[u8]> for CotpHeader {
type Error = CotpParseError;
fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
let (header, _) = Self::from_bytes(data)?;
Ok(header)
}
}
impl fmt::Display for CotpHeader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let type_str = match self.pdu_type {
CotpPduType::Data => "Data (DT)",
CotpPduType::ConnectionRequest => "Connection Request (CR)",
CotpPduType::ConnectionConfirm => "Connection Confirm (CC)",
CotpPduType::DisconnectRequest => "Disconnect Request (DR)",
CotpPduType::DisconnectConfirm => "Disconnect Confirm (DC)",
CotpPduType::TpduError => "TPDU Error (ER)",
CotpPduType::Other(code) => return write!(f, "Unknown PDU Type: 0x{code:02X}"),
};
writeln!(f, "COTP: {type_str}")?;
writeln!(f, " Length: {}", self.length)?;
writeln!(f, " Destination reference: 0x{:04X}", self.dst_ref)?;
writeln!(f, " Source reference: 0x{:04X}", self.src_ref)?;
if self.class != 0 {
writeln!(f, " Class: {}", self.class)?;
}
if self.extended_formats {
writeln!(f, " Extended formats: True")?;
}
if self.no_explicit_flow_control {
writeln!(f, " No explicit flow control: True")?;
}
for param in &self.parameters {
match param {
CotpParameter::TpduSize(size) => {
let tpdu_size = match size {
0x09 => 512,
0x0A => 1024,
0x0B => 2048,
0x0C => 4096,
0x0D => 8192,
_ => 1 << (*size as u16 + 6),
};
writeln!(f, " TPDU size: {tpdu_size} bytes")?;
}
CotpParameter::SrcTsap(tsap) => {
writeln!(f, " Source TSAP: 0x{tsap:04X}")?;
}
CotpParameter::DstTsap(tsap) => {
writeln!(f, " Destination TSAP: 0x{tsap:04X}")?;
}
CotpParameter::TpduNumber(num) => {
writeln!(f, " TPDU Number: {num}")?;
}
CotpParameter::Eot(_) => {
writeln!(f, " End of TSDU: Yes")?;
}
CotpParameter::Other(code, data) => {
write!(f, " Parameter 0x{code:02X}: ")?;
for byte in data {
write!(f, "{byte:02X} ")?;
}
writeln!(f)?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_cc_packet() {
let data = [
0x11, 0xD0, 0x00, 0x01, 0x00, 0x03, 0x00, 0xC0, 0x01, 0x09, 0xC1, 0x02, 0x01, 0x00, 0xC2, 0x02, 0x01, 0x02, ];
let (header, bytes_parsed) = CotpHeader::from_bytes(&data).unwrap();
assert_eq!(bytes_parsed, data.len());
assert!(matches!(header.pdu_type, CotpPduType::ConnectionConfirm));
assert_eq!(header.dst_ref, 0x0001);
assert_eq!(header.src_ref, 0x0003);
let mut has_tpdu_size = false;
let mut has_src_tsap = false;
let mut has_dst_tsap = false;
for param in &header.parameters {
match param {
CotpParameter::TpduSize(0x09) => has_tpdu_size = true,
CotpParameter::SrcTsap(0x0100) => has_src_tsap = true,
CotpParameter::DstTsap(0x0102) => has_dst_tsap = true,
_ => {}
}
}
assert!(has_tpdu_size);
assert!(has_src_tsap);
assert!(has_dst_tsap);
}
}