use bytes::{Bytes, BytesMut};
use rusty_modbus_codec::{DecodeError, ResponsePdu, decode_response};
use rusty_modbus_frame::owned::{
OwnedDiagnosticsResponse, OwnedEncapsulatedInterfaceResponse, OwnedGetCommEventLogResponse,
OwnedReadCoilsResponse, OwnedReadDiscreteInputsResponse, OwnedReadFifoQueueResponse,
OwnedReadFileRecordResponse, OwnedReadHoldingRegistersResponse,
OwnedReadInputRegistersResponse, OwnedReadWriteMultipleRegistersResponse,
OwnedReportServerIdResponse, OwnedWriteFileRecordResponse,
};
use rusty_modbus_frame::{MbapCodec, OwnedResponsePdu};
use rusty_modbus_types::MbapHeader;
use tokio_util::codec::Decoder;
use zerocopy::IntoBytes;
fn build_mbap_adu(transaction_id: u16, unit_id: u8, pdu: &[u8]) -> Vec<u8> {
let header = MbapHeader::new(transaction_id, unit_id, pdu.len() as u16);
let mut buf = Vec::with_capacity(7 + pdu.len());
buf.extend_from_slice(header.as_bytes());
buf.extend_from_slice(pdu);
buf
}
#[test]
fn end_to_end_tcp_read_holding_registers_response() {
let pdu = [0x03, 0x06, 0x02, 0x2B, 0x00, 0x00, 0x00, 0x64];
let adu = build_mbap_adu(0x0001, 0xFF, &pdu);
let mut codec = MbapCodec;
let mut src = BytesMut::from(&adu[..]);
let frame = codec
.decode(&mut src)
.unwrap()
.expect("should decode a frame");
assert_eq!(frame.unit_id(), 0xFF);
assert_eq!(frame.pdu.len(), 8);
let response = decode_response(&frame.pdu).unwrap();
match response {
ResponsePdu::ReadHoldingRegisters(rhr) => {
assert_eq!(rhr.count(), 3);
assert_eq!(rhr.register(0), 0x022B);
assert_eq!(rhr.register(1), 0x0000);
assert_eq!(rhr.register(2), 0x0064);
}
other => panic!("unexpected response: {other:?}"),
}
}
#[test]
fn end_to_end_owned_response_conversion() {
let pdu = [0x03, 0x06, 0x02, 0x2B, 0x00, 0x00, 0x00, 0x64];
let adu = build_mbap_adu(0x0001, 0xFF, &pdu);
let mut codec = MbapCodec;
let mut src = BytesMut::from(&adu[..]);
let frame = codec.decode(&mut src).unwrap().unwrap();
let owned = OwnedResponsePdu::from_pdu(frame.pdu).unwrap();
match owned {
OwnedResponsePdu::ReadHoldingRegisters(rhr) => {
assert_eq!(rhr.count(), 3);
assert_eq!(rhr.register(0), 0x022B);
assert_eq!(rhr.register(1), 0x0000);
assert_eq!(rhr.register(2), 0x0064);
}
other => panic!("unexpected owned response: {other:?}"),
}
}
#[test]
fn end_to_end_exception_response() {
let pdu = [0x83, 0x02];
let adu = build_mbap_adu(0x0042, 0x01, &pdu);
let mut codec = MbapCodec;
let mut src = BytesMut::from(&adu[..]);
let frame = codec.decode(&mut src).unwrap().unwrap();
let response = decode_response(&frame.pdu).unwrap();
match response {
ResponsePdu::Exception(exc) => {
assert_eq!(
exc.function_code,
rusty_modbus_types::FunctionCode::ReadHoldingRegisters
);
assert_eq!(
exc.exception_code,
rusty_modbus_types::ExceptionCode::IllegalDataAddress
);
}
other => panic!("unexpected: {other:?}"),
}
}
#[test]
fn end_to_end_partial_reads() {
let pdu = [0x03, 0x06, 0x02, 0x2B, 0x00, 0x00, 0x00, 0x64];
let adu = build_mbap_adu(0x0001, 0xFF, &pdu);
let mut codec = MbapCodec;
let mut src = BytesMut::new();
for (i, &byte) in adu.iter().enumerate() {
src.extend_from_slice(&[byte]);
let result = codec.decode(&mut src).unwrap();
if i < adu.len() - 1 {
assert!(result.is_none(), "should be None at byte {i}");
} else {
let frame = result.expect("should decode on final byte");
assert_eq!(frame.pdu.len(), 8);
}
}
}
#[test]
fn end_to_end_multiple_frames_in_stream() {
let pdu1 = [0x03, 0x02, 0x00, 0x0A]; let pdu2 = [0x01, 0x01, 0x01];
let mut stream = Vec::new();
stream.extend_from_slice(&build_mbap_adu(1, 0xFF, &pdu1));
stream.extend_from_slice(&build_mbap_adu(2, 0x01, &pdu2));
let mut codec = MbapCodec;
let mut src = BytesMut::from(&stream[..]);
let f1 = codec.decode(&mut src).unwrap().unwrap();
let f2 = codec.decode(&mut src).unwrap().unwrap();
assert_eq!(f1.unit_id(), 0xFF);
assert_eq!(f2.unit_id(), 0x01);
match decode_response(&f1.pdu).unwrap() {
ResponsePdu::ReadHoldingRegisters(r) => assert_eq!(r.register(0), 0x000A),
other => panic!("unexpected: {other:?}"),
}
match decode_response(&f2.pdu).unwrap() {
ResponsePdu::ReadCoils(r) => assert!(r.coil(0)),
other => panic!("unexpected: {other:?}"),
}
}
#[test]
fn owned_response_constructors_reject_empty_pdu_without_panicking() {
macro_rules! assert_empty_pdu_truncated {
($ty:ty) => {
assert!(matches!(
<$ty>::from_pdu(Bytes::new()),
Err(DecodeError::Truncated {
expected: 1,
actual: 0
})
));
};
}
assert_empty_pdu_truncated!(OwnedReadCoilsResponse);
assert_empty_pdu_truncated!(OwnedReadDiscreteInputsResponse);
assert_empty_pdu_truncated!(OwnedReadHoldingRegistersResponse);
assert_empty_pdu_truncated!(OwnedReadInputRegistersResponse);
assert_empty_pdu_truncated!(OwnedReadFifoQueueResponse);
assert_empty_pdu_truncated!(OwnedDiagnosticsResponse);
assert_empty_pdu_truncated!(OwnedGetCommEventLogResponse);
assert_empty_pdu_truncated!(OwnedReportServerIdResponse);
assert_empty_pdu_truncated!(OwnedReadFileRecordResponse);
assert_empty_pdu_truncated!(OwnedWriteFileRecordResponse);
assert_empty_pdu_truncated!(OwnedReadWriteMultipleRegistersResponse);
assert_empty_pdu_truncated!(OwnedEncapsulatedInterfaceResponse);
}