use std::net::SocketAddr;
use std::time::Duration;
use crate::coalescer::ReadCoalescer;
use crate::device_limits::DeviceLimits;
use crate::error::{ModbusError, ModbusResult};
use crate::logging::CallbackLogger;
use crate::protocol::{ModbusFunction, ModbusRequest, ModbusResponse, SlaveId};
use crate::transport::{ModbusTransport, TcpTransport, TransportStats};
#[cfg(feature = "rtu")]
use crate::transport::RtuTransport;
pub trait ModbusClient: Send + Sync {
fn read_01(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> impl std::future::Future<Output = ModbusResult<Vec<bool>>> + Send;
fn read_02(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> impl std::future::Future<Output = ModbusResult<Vec<bool>>> + Send;
fn read_03(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> impl std::future::Future<Output = ModbusResult<Vec<u16>>> + Send;
fn read_04(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> impl std::future::Future<Output = ModbusResult<Vec<u16>>> + Send;
fn write_05(
&mut self,
slave_id: SlaveId,
address: u16,
value: bool,
) -> impl std::future::Future<Output = ModbusResult<()>> + Send;
fn write_06(
&mut self,
slave_id: SlaveId,
address: u16,
value: u16,
) -> impl std::future::Future<Output = ModbusResult<()>> + Send;
fn write_0f(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[bool],
) -> impl std::future::Future<Output = ModbusResult<()>> + Send;
fn write_10(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[u16],
) -> impl std::future::Future<Output = ModbusResult<()>> + Send;
fn read_01_batch(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
limits: &DeviceLimits,
) -> impl std::future::Future<Output = ModbusResult<Vec<bool>>> + Send
where
Self: Sized,
{
let max_read_coils = limits.max_read_coils;
let inter_request_delay_ms = limits.inter_request_delay_ms;
async move {
if quantity == 0 {
return Ok(Vec::new());
}
let mut result = Vec::with_capacity(quantity as usize);
let mut current_address = address;
let mut remaining = quantity;
while remaining > 0 {
let count = remaining.min(max_read_coils);
let chunk = self.read_01(slave_id, current_address, count).await?;
result.extend_from_slice(&chunk);
current_address = current_address.saturating_add(count);
remaining -= count;
if inter_request_delay_ms > 0 && remaining > 0 {
tokio::time::sleep(Duration::from_millis(inter_request_delay_ms)).await;
}
}
Ok(result)
}
}
fn read_02_batch(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
limits: &DeviceLimits,
) -> impl std::future::Future<Output = ModbusResult<Vec<bool>>> + Send
where
Self: Sized,
{
let max_read_coils = limits.max_read_coils;
let inter_request_delay_ms = limits.inter_request_delay_ms;
async move {
if quantity == 0 {
return Ok(Vec::new());
}
let mut result = Vec::with_capacity(quantity as usize);
let mut current_address = address;
let mut remaining = quantity;
while remaining > 0 {
let count = remaining.min(max_read_coils);
let chunk = self.read_02(slave_id, current_address, count).await?;
result.extend_from_slice(&chunk);
current_address = current_address.saturating_add(count);
remaining -= count;
if inter_request_delay_ms > 0 && remaining > 0 {
tokio::time::sleep(Duration::from_millis(inter_request_delay_ms)).await;
}
}
Ok(result)
}
}
fn read_03_batch(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
limits: &DeviceLimits,
) -> impl std::future::Future<Output = ModbusResult<Vec<u16>>> + Send
where
Self: Sized,
{
let max_read_registers = limits.max_read_registers;
let inter_request_delay_ms = limits.inter_request_delay_ms;
async move {
if quantity == 0 {
return Ok(Vec::new());
}
let mut result = Vec::with_capacity(quantity as usize);
let mut current_address = address;
let mut remaining = quantity;
while remaining > 0 {
let count = remaining.min(max_read_registers);
let chunk = self.read_03(slave_id, current_address, count).await?;
result.extend_from_slice(&chunk);
current_address = current_address.saturating_add(count);
remaining -= count;
if inter_request_delay_ms > 0 && remaining > 0 {
tokio::time::sleep(Duration::from_millis(inter_request_delay_ms)).await;
}
}
Ok(result)
}
}
fn read_04_batch(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
limits: &DeviceLimits,
) -> impl std::future::Future<Output = ModbusResult<Vec<u16>>> + Send
where
Self: Sized,
{
let max_read_registers = limits.max_read_registers;
let inter_request_delay_ms = limits.inter_request_delay_ms;
async move {
if quantity == 0 {
return Ok(Vec::new());
}
let mut result = Vec::with_capacity(quantity as usize);
let mut current_address = address;
let mut remaining = quantity;
while remaining > 0 {
let count = remaining.min(max_read_registers);
let chunk = self.read_04(slave_id, current_address, count).await?;
result.extend_from_slice(&chunk);
current_address = current_address.saturating_add(count);
remaining -= count;
if inter_request_delay_ms > 0 && remaining > 0 {
tokio::time::sleep(Duration::from_millis(inter_request_delay_ms)).await;
}
}
Ok(result)
}
}
fn is_connected(&self) -> bool;
fn close(&mut self) -> impl std::future::Future<Output = ModbusResult<()>> + Send;
fn get_stats(&self) -> TransportStats;
#[inline]
fn read_coils(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> impl std::future::Future<Output = ModbusResult<Vec<bool>>> + Send {
self.read_01(slave_id, address, quantity)
}
#[inline]
fn read_discrete_inputs(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> impl std::future::Future<Output = ModbusResult<Vec<bool>>> + Send {
self.read_02(slave_id, address, quantity)
}
#[inline]
fn read_holding_registers(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> impl std::future::Future<Output = ModbusResult<Vec<u16>>> + Send {
self.read_03(slave_id, address, quantity)
}
#[inline]
fn read_input_registers(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> impl std::future::Future<Output = ModbusResult<Vec<u16>>> + Send {
self.read_04(slave_id, address, quantity)
}
#[inline]
fn write_single_coil(
&mut self,
slave_id: SlaveId,
address: u16,
value: bool,
) -> impl std::future::Future<Output = ModbusResult<()>> + Send {
self.write_05(slave_id, address, value)
}
#[inline]
fn write_single_register(
&mut self,
slave_id: SlaveId,
address: u16,
value: u16,
) -> impl std::future::Future<Output = ModbusResult<()>> + Send {
self.write_06(slave_id, address, value)
}
#[inline]
fn write_multiple_coils(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[bool],
) -> impl std::future::Future<Output = ModbusResult<()>> + Send {
self.write_0f(slave_id, address, values)
}
#[inline]
fn write_multiple_registers(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[u16],
) -> impl std::future::Future<Output = ModbusResult<()>> + Send {
self.write_10(slave_id, address, values)
}
#[inline]
fn read_coils_batch(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
limits: &DeviceLimits,
) -> impl std::future::Future<Output = ModbusResult<Vec<bool>>> + Send
where
Self: Sized,
{
self.read_01_batch(slave_id, address, quantity, limits)
}
#[inline]
fn read_discrete_inputs_batch(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
limits: &DeviceLimits,
) -> impl std::future::Future<Output = ModbusResult<Vec<bool>>> + Send
where
Self: Sized,
{
self.read_02_batch(slave_id, address, quantity, limits)
}
#[inline]
fn read_holding_registers_batch(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
limits: &DeviceLimits,
) -> impl std::future::Future<Output = ModbusResult<Vec<u16>>> + Send
where
Self: Sized,
{
self.read_03_batch(slave_id, address, quantity, limits)
}
#[inline]
fn read_input_registers_batch(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
limits: &DeviceLimits,
) -> impl std::future::Future<Output = ModbusResult<Vec<u16>>> + Send
where
Self: Sized,
{
self.read_04_batch(slave_id, address, quantity, limits)
}
}
pub struct GenericModbusClient<T: ModbusTransport> {
transport: T,
logger: Option<CallbackLogger>,
}
impl<T: ModbusTransport> GenericModbusClient<T> {
pub fn new(transport: T) -> Self {
Self {
transport,
logger: None,
}
}
pub fn with_logger(transport: T, logger: CallbackLogger) -> Self {
Self {
transport,
logger: Some(logger),
}
}
pub fn transport(&self) -> &T {
&self.transport
}
pub fn transport_mut(&mut self) -> &mut T {
&mut self.transport
}
pub async fn execute_request(
&mut self,
request: ModbusRequest,
) -> ModbusResult<ModbusResponse> {
if request.slave_id == 0 && request.function.is_read_function() {
return Err(ModbusError::invalid_data(
"Broadcast (slave_id=0) is only valid for write operations",
));
}
if let Some(ref logger) = self.logger {
logger.log_request(
None, request.slave_id,
request.function.to_u8(),
request.address,
request.quantity,
&request.data,
);
}
let response = self.transport.request(&request).await?;
if let Some(ref logger) = self.logger {
logger.log_response(
None,
response.slave_id,
response.function.to_u8(),
response.data(),
);
}
Ok(response)
}
}
impl<T: ModbusTransport + Send + Sync> ModbusClient for GenericModbusClient<T> {
async fn read_01(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<bool>> {
if quantity == 0 || quantity > 2000 {
return Err(ModbusError::invalid_data("Invalid quantity"));
}
let request = ModbusRequest {
slave_id,
function: ModbusFunction::ReadCoils,
address,
quantity,
data: vec![],
};
let response = self.execute_request(request).await?;
let mut bits = response.parse_bits()?;
bits.truncate(quantity as usize);
Ok(bits)
}
async fn read_02(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<bool>> {
if quantity == 0 || quantity > 2000 {
return Err(ModbusError::invalid_data("Invalid quantity"));
}
let request = ModbusRequest {
slave_id,
function: ModbusFunction::ReadDiscreteInputs,
address,
quantity,
data: vec![],
};
let response = self.execute_request(request).await?;
let mut bits = response.parse_bits()?;
bits.truncate(quantity as usize);
Ok(bits)
}
async fn read_03(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<u16>> {
if quantity == 0 || quantity > 125 {
return Err(ModbusError::invalid_data("Invalid quantity"));
}
let request = ModbusRequest {
slave_id,
function: ModbusFunction::ReadHoldingRegisters,
address,
quantity,
data: vec![],
};
let response = self.execute_request(request).await?;
response.parse_registers()
}
async fn read_04(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<u16>> {
if quantity == 0 || quantity > 125 {
return Err(ModbusError::invalid_data("Invalid quantity"));
}
let request = ModbusRequest {
slave_id,
function: ModbusFunction::ReadInputRegisters,
address,
quantity,
data: vec![],
};
let response = self.execute_request(request).await?;
response.parse_registers()
}
async fn write_05(&mut self, slave_id: SlaveId, address: u16, value: bool) -> ModbusResult<()> {
let request = ModbusRequest {
slave_id,
function: ModbusFunction::WriteSingleCoil,
address,
quantity: 1,
data: if value {
vec![0xFF, 0x00]
} else {
vec![0x00, 0x00]
},
};
self.execute_request(request).await?;
Ok(())
}
async fn write_06(&mut self, slave_id: SlaveId, address: u16, value: u16) -> ModbusResult<()> {
let [hi, lo] = value.to_be_bytes();
let request = ModbusRequest {
slave_id,
function: ModbusFunction::WriteSingleRegister,
address,
quantity: 1,
data: vec![hi, lo],
};
self.execute_request(request).await?;
Ok(())
}
async fn write_0f(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[bool],
) -> ModbusResult<()> {
if values.is_empty() || values.len() > 1968 {
return Err(ModbusError::invalid_data("Invalid quantity"));
}
let byte_count = values.len().div_ceil(8);
let mut data = Vec::with_capacity(byte_count);
for chunk in values.chunks(8) {
let mut byte = 0u8;
for (i, &coil) in chunk.iter().enumerate() {
if coil {
byte |= 1 << i;
}
}
data.push(byte);
}
let request = ModbusRequest {
slave_id,
function: ModbusFunction::WriteMultipleCoils,
address,
quantity: values.len() as u16,
data,
};
self.execute_request(request).await?;
Ok(())
}
async fn write_10(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[u16],
) -> ModbusResult<()> {
if values.is_empty() || values.len() > 123 {
return Err(ModbusError::invalid_data("Invalid quantity"));
}
let mut data = Vec::with_capacity(values.len() * 2);
for &value in values {
data.extend_from_slice(&value.to_be_bytes());
}
let request = ModbusRequest {
slave_id,
function: ModbusFunction::WriteMultipleRegisters,
address,
quantity: values.len() as u16,
data,
};
self.execute_request(request).await?;
Ok(())
}
fn is_connected(&self) -> bool {
self.transport.is_connected()
}
async fn close(&mut self) -> ModbusResult<()> {
self.transport.close().await
}
fn get_stats(&self) -> TransportStats {
self.transport.get_stats()
}
}
impl<T: ModbusTransport + Send + Sync> GenericModbusClient<T> {
pub async fn read_holding_registers_coalesced(
&mut self,
slave_id: u8,
regions: &[(u16, u16)],
) -> ModbusResult<Vec<Vec<u16>>> {
self.inner_read_coalesced(slave_id, 0x03, regions).await
}
pub async fn read_input_registers_coalesced(
&mut self,
slave_id: u8,
regions: &[(u16, u16)],
) -> ModbusResult<Vec<Vec<u16>>> {
self.inner_read_coalesced(slave_id, 0x04, regions).await
}
async fn inner_read_coalesced(
&mut self,
slave_id: u8,
function: u8,
regions: &[(u16, u16)],
) -> ModbusResult<Vec<Vec<u16>>> {
if regions.is_empty() {
return Ok(Vec::new());
}
let requests: Vec<crate::coalescer::ReadRequest> = regions
.iter()
.map(|&(address, quantity)| {
crate::coalescer::ReadRequest::new(slave_id, function, address, quantity)
})
.collect();
let coalescer = ReadCoalescer::new();
let coalesced_list = coalescer.coalesce(&requests);
let mut results: Vec<Vec<u16>> = vec![Vec::new(); regions.len()];
for coalesced in &coalesced_list {
let data = match function {
0x03 => {
self.read_03(slave_id, coalesced.address, coalesced.quantity)
.await?
}
0x04 => {
self.read_04(slave_id, coalesced.address, coalesced.quantity)
.await?
}
_ => return Err(ModbusError::invalid_function(function)),
};
let extracted = coalescer.extract_results(coalesced, &data);
for (i, &(orig_idx, _, _)) in coalesced.mappings.iter().enumerate() {
results[orig_idx] = extracted[i].clone();
}
}
Ok(results)
}
}
pub struct ModbusTcpClient {
inner: GenericModbusClient<TcpTransport>,
}
impl ModbusTcpClient {
pub async fn new(addr: SocketAddr, timeout: Duration) -> ModbusResult<Self> {
let transport = TcpTransport::new(addr, timeout).await?;
Ok(Self {
inner: GenericModbusClient::new(transport),
})
}
pub async fn with_logging(
addr: &str,
timeout: Duration,
logger: Option<CallbackLogger>,
) -> ModbusResult<Self> {
let addr: SocketAddr = addr
.parse()
.map_err(|e| ModbusError::configuration(format!("Invalid address: {}", e)))?;
let transport = TcpTransport::new(addr, timeout).await?;
let logger = logger.unwrap_or_default();
Ok(Self {
inner: GenericModbusClient::with_logger(transport, logger),
})
}
pub async fn from_address(addr: &str, timeout: Duration) -> ModbusResult<Self> {
let addr: SocketAddr = addr
.parse()
.map_err(|e| ModbusError::configuration(format!("Invalid address: {}", e)))?;
Self::new(addr, timeout).await
}
pub fn from_transport(transport: TcpTransport) -> Self {
Self {
inner: GenericModbusClient::new(transport),
}
}
pub fn server_address(&self) -> SocketAddr {
self.inner.transport().address
}
pub fn set_packet_logging(&mut self, enabled: bool) {
self.inner.transport_mut().set_packet_logging(enabled);
}
pub async fn execute_request(
&mut self,
request: ModbusRequest,
) -> ModbusResult<ModbusResponse> {
self.inner.execute_request(request).await
}
pub async fn pipeline(
&mut self,
requests: Vec<ModbusRequest>,
pipeline_timeout: Duration,
) -> ModbusResult<Vec<ModbusResult<ModbusResponse>>> {
if requests.is_empty() {
return Ok(Vec::new());
}
let count = requests.len();
let transport = self.inner.transport_mut();
let tids = transport.send_pipeline_requests(&requests).await?;
let mut response_map = transport
.receive_pipeline_responses(count, pipeline_timeout)
.await?;
let results = tids
.into_iter()
.map(|tid| {
response_map.remove(&tid).unwrap_or_else(|| {
Err(ModbusError::timeout(
"pipeline response missing",
pipeline_timeout.as_millis() as u64,
))
})
})
.collect();
Ok(results)
}
pub async fn pipeline_reads(
&mut self,
slave_id: SlaveId,
reads: &[(u16, u16)], pipeline_timeout: Duration,
) -> ModbusResult<Vec<ModbusResult<Vec<u16>>>> {
let requests: Vec<ModbusRequest> = reads
.iter()
.map(|&(address, quantity)| {
ModbusRequest::new_read(
slave_id,
ModbusFunction::ReadHoldingRegisters,
address,
quantity,
)
})
.collect();
let raw_results = self.pipeline(requests, pipeline_timeout).await?;
let results = raw_results
.into_iter()
.map(|r| r.and_then(|resp| resp.parse_registers()))
.collect();
Ok(results)
}
}
impl ModbusClient for ModbusTcpClient {
async fn read_01(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<bool>> {
self.inner.read_01(slave_id, address, quantity).await
}
async fn read_02(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<bool>> {
self.inner.read_02(slave_id, address, quantity).await
}
async fn read_03(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<u16>> {
self.inner.read_03(slave_id, address, quantity).await
}
async fn read_04(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<u16>> {
self.inner.read_04(slave_id, address, quantity).await
}
async fn write_05(&mut self, slave_id: SlaveId, address: u16, value: bool) -> ModbusResult<()> {
self.inner.write_05(slave_id, address, value).await
}
async fn write_06(&mut self, slave_id: SlaveId, address: u16, value: u16) -> ModbusResult<()> {
self.inner.write_06(slave_id, address, value).await
}
async fn write_0f(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[bool],
) -> ModbusResult<()> {
self.inner.write_0f(slave_id, address, values).await
}
async fn write_10(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[u16],
) -> ModbusResult<()> {
self.inner.write_10(slave_id, address, values).await
}
fn is_connected(&self) -> bool {
self.inner.is_connected()
}
async fn close(&mut self) -> ModbusResult<()> {
self.inner.close().await
}
fn get_stats(&self) -> TransportStats {
self.inner.get_stats()
}
}
#[cfg(feature = "rtu")]
pub struct ModbusRtuClient {
inner: GenericModbusClient<RtuTransport>,
}
#[cfg(feature = "rtu")]
impl ModbusRtuClient {
pub fn new(port: &str, baud_rate: u32) -> ModbusResult<Self> {
let transport = RtuTransport::new(port, baud_rate)?;
Ok(Self {
inner: GenericModbusClient::new(transport),
})
}
pub fn with_logging(
port: &str,
baud_rate: u32,
logger: Option<CallbackLogger>,
) -> ModbusResult<Self> {
let transport = RtuTransport::new(port, baud_rate)?;
let logger = logger.unwrap_or_default();
Ok(Self {
inner: GenericModbusClient::with_logger(transport, logger),
})
}
pub fn with_config_and_logging(
port: &str,
baud_rate: u32,
data_bits: tokio_serial::DataBits,
stop_bits: tokio_serial::StopBits,
parity: tokio_serial::Parity,
timeout: Duration,
logger: Option<CallbackLogger>,
) -> ModbusResult<Self> {
let transport =
RtuTransport::new_with_config(port, baud_rate, data_bits, stop_bits, parity, timeout)?;
let logger = logger.unwrap_or_default();
Ok(Self {
inner: GenericModbusClient::with_logger(transport, logger),
})
}
pub fn from_transport(transport: RtuTransport) -> Self {
Self {
inner: GenericModbusClient::new(transport),
}
}
pub fn transport(&self) -> &RtuTransport {
self.inner.transport()
}
pub fn set_packet_logging(&mut self, enabled: bool) {
self.inner.transport_mut().set_packet_logging(enabled);
}
pub async fn execute_request(
&mut self,
request: ModbusRequest,
) -> ModbusResult<ModbusResponse> {
self.inner.execute_request(request).await
}
}
#[cfg(feature = "rtu")]
impl ModbusClient for ModbusRtuClient {
async fn read_01(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<bool>> {
self.inner.read_01(slave_id, address, quantity).await
}
async fn read_02(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<bool>> {
self.inner.read_02(slave_id, address, quantity).await
}
async fn read_03(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<u16>> {
self.inner.read_03(slave_id, address, quantity).await
}
async fn read_04(
&mut self,
slave_id: SlaveId,
address: u16,
quantity: u16,
) -> ModbusResult<Vec<u16>> {
self.inner.read_04(slave_id, address, quantity).await
}
async fn write_05(&mut self, slave_id: SlaveId, address: u16, value: bool) -> ModbusResult<()> {
self.inner.write_05(slave_id, address, value).await
}
async fn write_06(&mut self, slave_id: SlaveId, address: u16, value: u16) -> ModbusResult<()> {
self.inner.write_06(slave_id, address, value).await
}
async fn write_0f(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[bool],
) -> ModbusResult<()> {
self.inner.write_0f(slave_id, address, values).await
}
async fn write_10(
&mut self,
slave_id: SlaveId,
address: u16,
values: &[u16],
) -> ModbusResult<()> {
self.inner.write_10(slave_id, address, values).await
}
fn is_connected(&self) -> bool {
self.inner.is_connected()
}
async fn close(&mut self) -> ModbusResult<()> {
self.inner.close().await
}
fn get_stats(&self) -> TransportStats {
self.inner.get_stats()
}
}
pub mod utils {
use super::*;
pub async fn read_mixed_registers<T: ModbusClient>(
client: &mut T,
slave_id: SlaveId,
operations: &[(ModbusFunction, u16, u16)], ) -> ModbusResult<Vec<Vec<u16>>> {
let mut results = Vec::new();
for &(function, address, quantity) in operations {
let values = match function {
ModbusFunction::ReadHoldingRegisters => {
client.read_03(slave_id, address, quantity).await?
}
ModbusFunction::ReadInputRegisters => {
client.read_04(slave_id, address, quantity).await?
}
_ => return Err(ModbusError::invalid_function(function.to_u8())),
};
results.push(values);
}
Ok(results)
}
pub async fn batch_write_registers<T: ModbusClient>(
client: &mut T,
slave_id: SlaveId,
writes: &[(u16, Vec<u16>)], ) -> ModbusResult<()> {
for (address, values) in writes {
if values.len() == 1 {
client.write_06(slave_id, *address, values[0]).await?;
} else {
client.write_10(slave_id, *address, values).await?;
}
}
Ok(())
}
pub fn registers_to_u32_be(registers: &[u16]) -> Vec<u32> {
registers
.chunks(2)
.filter_map(|chunk| {
if chunk.len() == 2 {
Some(((chunk[0] as u32) << 16) | (chunk[1] as u32))
} else {
None
}
})
.collect()
}
pub fn registers_to_i32_be(registers: &[u16]) -> Vec<i32> {
registers_to_u32_be(registers)
.into_iter()
.map(|v| v as i32)
.collect()
}
pub fn registers_to_f32_be(registers: &[u16]) -> Vec<f32> {
registers_to_u32_be(registers)
.into_iter()
.map(f32::from_bits)
.collect()
}
pub fn u32_to_registers_be(values: &[u32]) -> Vec<u16> {
values
.iter()
.flat_map(|&v| [(v >> 16) as u16, v as u16])
.collect()
}
pub fn f32_to_registers_be(values: &[f32]) -> Vec<u16> {
let u32_values: Vec<u32> = values.iter().map(|&v| v.to_bits()).collect();
u32_to_registers_be(&u32_values)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_register_conversion() {
let registers = vec![0x1234, 0x5678, 0xABCD, 0xEF01];
let u32_values = utils::registers_to_u32_be(®isters);
assert_eq!(u32_values, vec![0x12345678, 0xABCDEF01]);
let back_to_registers = utils::u32_to_registers_be(&u32_values);
assert_eq!(back_to_registers, registers);
}
#[test]
fn test_float_conversion() {
let float_values = vec![1.5f32, -2.75f32];
let registers = utils::f32_to_registers_be(&float_values);
let back_to_floats = utils::registers_to_f32_be(®isters);
for (original, converted) in float_values.iter().zip(back_to_floats.iter()) {
assert!((original - converted).abs() < f32::EPSILON);
}
}
#[tokio::test]
async fn test_tcp_client_creation() {
use std::time::Duration;
let result = ModbusTcpClient::from_address("127.0.0.1:9999", Duration::from_secs(1)).await;
println!("TCP client creation result: {:?}", result.is_ok());
}
use std::collections::VecDeque;
use std::sync::Mutex;
struct MockTransport {
requests: Mutex<Vec<ModbusRequest>>,
responses: Mutex<VecDeque<ModbusResult<ModbusResponse>>>,
connected: Mutex<bool>,
}
impl MockTransport {
fn new() -> Self {
Self {
requests: Mutex::new(Vec::new()),
responses: Mutex::new(VecDeque::new()),
connected: Mutex::new(true),
}
}
fn add_response(&self, response: ModbusResult<ModbusResponse>) {
self.responses.lock().unwrap().push_back(response);
}
fn get_requests(&self) -> Vec<ModbusRequest> {
self.requests.lock().unwrap().clone()
}
}
impl ModbusTransport for MockTransport {
fn request(
&mut self,
request: &ModbusRequest,
) -> impl std::future::Future<Output = ModbusResult<ModbusResponse>> + Send {
self.requests.lock().unwrap().push(request.clone());
let result = if request.slave_id == 0 {
Ok(ModbusResponse::new_broadcast_ack(request.function))
} else {
self.responses
.lock()
.unwrap()
.pop_front()
.unwrap_or_else(|| Err(ModbusError::connection("No response prepared in mock")))
};
async move { result }
}
fn is_connected(&self) -> bool {
*self.connected.lock().unwrap()
}
fn close(&mut self) -> impl std::future::Future<Output = ModbusResult<()>> + Send {
*self.connected.lock().unwrap() = false;
async { Ok(()) }
}
fn get_stats(&self) -> TransportStats {
TransportStats::default()
}
}
fn create_register_response(slave_id: SlaveId, values: &[u16]) -> ModbusResponse {
let byte_count = (values.len() * 2) as u8;
let mut data = Vec::with_capacity(1 + values.len() * 2);
data.push(byte_count);
for &val in values {
data.extend_from_slice(&val.to_be_bytes());
}
ModbusResponse::new_success(slave_id, ModbusFunction::ReadHoldingRegisters, data)
}
fn create_coil_response(slave_id: SlaveId, coils: &[bool]) -> ModbusResponse {
let byte_count = ((coils.len() + 7) / 8) as u8;
let mut data = Vec::with_capacity(1 + byte_count as usize);
data.push(byte_count);
let mut byte = 0u8;
for (i, &coil) in coils.iter().enumerate() {
if coil {
byte |= 1 << (i % 8);
}
if (i + 1) % 8 == 0 || i == coils.len() - 1 {
data.push(byte);
byte = 0;
}
}
ModbusResponse::new_success(slave_id, ModbusFunction::ReadCoils, data)
}
#[tokio::test]
async fn test_read_03_batch_single_chunk() {
let mock = MockTransport::new();
let values: Vec<u16> = (1..=10).collect();
mock.add_response(Ok(create_register_response(1, &values)));
let mut client = GenericModbusClient::new(mock);
let limits = DeviceLimits::new().with_max_read_registers(50);
let result = client.read_03_batch(1, 0, 10, &limits).await.unwrap();
assert_eq!(result, values);
assert_eq!(client.transport().get_requests().len(), 1);
let req = &client.transport().get_requests()[0];
assert_eq!(req.address, 0);
assert_eq!(req.quantity, 10);
}
#[tokio::test]
async fn test_read_03_batch_multiple_chunks() {
let mock = MockTransport::new();
let chunk1: Vec<u16> = (1..=50).collect();
let chunk2: Vec<u16> = (51..=100).collect();
let chunk3: Vec<u16> = (101..=120).collect();
mock.add_response(Ok(create_register_response(1, &chunk1)));
mock.add_response(Ok(create_register_response(1, &chunk2)));
mock.add_response(Ok(create_register_response(1, &chunk3)));
let mut client = GenericModbusClient::new(mock);
let limits = DeviceLimits::new().with_max_read_registers(50);
let result = client.read_03_batch(1, 0, 120, &limits).await.unwrap();
let expected: Vec<u16> = (1..=120).collect();
assert_eq!(result, expected);
let requests = client.transport().get_requests();
assert_eq!(requests.len(), 3);
assert_eq!(requests[0].address, 0);
assert_eq!(requests[0].quantity, 50);
assert_eq!(requests[1].address, 50);
assert_eq!(requests[1].quantity, 50);
assert_eq!(requests[2].address, 100);
assert_eq!(requests[2].quantity, 20);
}
#[tokio::test]
async fn test_read_03_batch_exact_boundary() {
let mock = MockTransport::new();
let values: Vec<u16> = (1..=50).collect();
mock.add_response(Ok(create_register_response(1, &values)));
let mut client = GenericModbusClient::new(mock);
let limits = DeviceLimits::new().with_max_read_registers(50);
let result = client.read_03_batch(1, 100, 50, &limits).await.unwrap();
assert_eq!(result, values);
assert_eq!(client.transport().get_requests().len(), 1);
let req = &client.transport().get_requests()[0];
assert_eq!(req.address, 100);
assert_eq!(req.quantity, 50);
}
#[tokio::test]
async fn test_read_03_batch_empty() {
let mock = MockTransport::new();
let mut client = GenericModbusClient::new(mock);
let limits = DeviceLimits::new();
let result = client.read_03_batch(1, 0, 0, &limits).await.unwrap();
assert!(result.is_empty());
assert_eq!(client.transport().get_requests().len(), 0);
}
#[tokio::test]
async fn test_read_03_batch_error_propagation() {
let mock = MockTransport::new();
let chunk1: Vec<u16> = (1..=50).collect();
mock.add_response(Ok(create_register_response(1, &chunk1)));
mock.add_response(Err(ModbusError::timeout("Simulated timeout", 1000)));
let mut client = GenericModbusClient::new(mock);
let limits = DeviceLimits::new().with_max_read_registers(50);
let result = client.read_03_batch(1, 0, 100, &limits).await;
assert!(result.is_err());
assert_eq!(client.transport().get_requests().len(), 2);
}
#[tokio::test]
async fn test_read_01_batch_coils() {
let mock = MockTransport::new();
let chunk1: Vec<bool> = (0..500).map(|i| i % 2 == 0).collect();
let chunk2: Vec<bool> = (0..100).map(|i| i % 3 == 0).collect();
mock.add_response(Ok(create_coil_response(1, &chunk1)));
mock.add_response(Ok(create_coil_response(1, &chunk2)));
let mut client = GenericModbusClient::new(mock);
let limits = DeviceLimits::new().with_max_read_coils(500);
let result = client.read_01_batch(1, 0, 600, &limits).await.unwrap();
assert_eq!(result.len(), 600);
let requests = client.transport().get_requests();
assert_eq!(requests.len(), 2);
assert_eq!(requests[0].quantity, 500);
assert_eq!(requests[1].quantity, 100);
}
#[tokio::test]
async fn test_broadcast_write_coil() {
let mock = MockTransport::new();
let mut client = GenericModbusClient::new(mock);
let result = client.write_05(0, 1, true).await;
assert!(
result.is_ok(),
"broadcast write_05 should succeed: {result:?}"
);
let reqs = client.transport().get_requests();
assert_eq!(reqs.len(), 1);
assert_eq!(reqs[0].slave_id, 0);
assert_eq!(reqs[0].function, ModbusFunction::WriteSingleCoil);
}
#[tokio::test]
async fn test_broadcast_write_register() {
let mock = MockTransport::new();
let mut client = GenericModbusClient::new(mock);
let result = client.write_06(0, 100, 0xABCD).await;
assert!(
result.is_ok(),
"broadcast write_06 should succeed: {result:?}"
);
let reqs = client.transport().get_requests();
assert_eq!(reqs.len(), 1);
assert_eq!(reqs[0].slave_id, 0);
assert_eq!(reqs[0].function, ModbusFunction::WriteSingleRegister);
}
#[tokio::test]
async fn test_broadcast_write_multiple() {
let mock = MockTransport::new();
let mut client = GenericModbusClient::new(mock);
let result = client.write_10(0, 0, &[0x0001, 0x0002, 0x0003]).await;
assert!(
result.is_ok(),
"broadcast write_10 should succeed: {result:?}"
);
let reqs = client.transport().get_requests();
assert_eq!(reqs.len(), 1);
assert_eq!(reqs[0].slave_id, 0);
assert_eq!(reqs[0].function, ModbusFunction::WriteMultipleRegisters);
}
#[tokio::test]
async fn test_broadcast_read_rejected() {
let mock = MockTransport::new();
let mut client = GenericModbusClient::new(mock);
let err = client.read_03(0, 0, 1).await.unwrap_err();
assert!(
err.to_string().contains("Broadcast"),
"expected broadcast error, got: {err}"
);
assert!(client.transport().get_requests().is_empty());
}
#[tokio::test]
async fn test_broadcast_response_is_ack() {
let mock = MockTransport::new();
let mut client = GenericModbusClient::new(mock);
let request =
ModbusRequest::new_write(0, ModbusFunction::WriteSingleRegister, 10, vec![0x00, 0x01]);
let response = client.execute_request(request).await.unwrap();
assert_eq!(response.slave_id, 0);
assert_eq!(response.function, ModbusFunction::WriteSingleRegister);
assert!(!response.is_exception());
assert!(response.data().is_empty());
}
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
fn build_fc03_response_frame(tid: u16, slave_id: u8, values: &[u16]) -> Vec<u8> {
let byte_count = (values.len() * 2) as u8;
let pdu_len = (2 + 1 + values.len() * 2) as u16;
let mut frame = Vec::new();
frame.extend_from_slice(&tid.to_be_bytes()); frame.extend_from_slice(&0u16.to_be_bytes()); frame.extend_from_slice(&pdu_len.to_be_bytes()); frame.push(slave_id); frame.push(0x03); frame.push(byte_count);
for &v in values {
frame.extend_from_slice(&v.to_be_bytes());
}
frame
}
fn build_fc06_response_frame(tid: u16, slave_id: u8, address: u16, value: u16) -> Vec<u8> {
let pdu_len: u16 = 6; let mut frame = Vec::new();
frame.extend_from_slice(&tid.to_be_bytes());
frame.extend_from_slice(&0u16.to_be_bytes());
frame.extend_from_slice(&pdu_len.to_be_bytes());
frame.push(slave_id);
frame.push(0x06);
frame.extend_from_slice(&address.to_be_bytes());
frame.extend_from_slice(&value.to_be_bytes());
frame
}
async fn spawn_mock_server<H, Fut>(
request_count: usize,
handler: H,
) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>)
where
H: FnOnce(Vec<(u16, u8, u8)>) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Vec<u8>> + Send,
{
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let handle = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut requests_meta: Vec<(u16, u8, u8)> = Vec::new();
for _ in 0..request_count {
let mut mbap = [0u8; 6];
socket.read_exact(&mut mbap).await.unwrap();
let tid = u16::from_be_bytes([mbap[0], mbap[1]]);
let length = u16::from_be_bytes([mbap[4], mbap[5]]) as usize;
let mut pdu = vec![0u8; length];
socket.read_exact(&mut pdu).await.unwrap();
let slave_id = pdu[0];
let func = pdu[1];
requests_meta.push((tid, slave_id, func));
}
let response_bytes = handler(requests_meta).await;
socket.write_all(&response_bytes).await.unwrap();
});
(addr, handle)
}
#[tokio::test]
async fn test_pipeline_empty() {
let (server_addr, _handle) = spawn_mock_server(0, |_| async { vec![] }).await;
let mut client = ModbusTcpClient::new(server_addr, Duration::from_secs(5))
.await
.unwrap();
let results = client
.pipeline(vec![], Duration::from_secs(5))
.await
.unwrap();
assert!(results.is_empty());
}
#[tokio::test]
async fn test_pipeline_single() {
let (server_addr, server_handle) = spawn_mock_server(1, |meta| async move {
let (tid, slave_id, _func) = meta[0];
let values: Vec<u16> = vec![1, 2, 3, 4, 5];
build_fc03_response_frame(tid, slave_id, &values)
})
.await;
let mut client = ModbusTcpClient::new(server_addr, Duration::from_secs(5))
.await
.unwrap();
let requests = vec![ModbusRequest::new_read(
1,
ModbusFunction::ReadHoldingRegisters,
0,
5,
)];
let results = client
.pipeline(requests, Duration::from_secs(5))
.await
.unwrap();
assert_eq!(results.len(), 1);
let registers = results[0].as_ref().unwrap().parse_registers().unwrap();
assert_eq!(registers, vec![1, 2, 3, 4, 5]);
server_handle.await.unwrap();
}
#[tokio::test]
async fn test_pipeline_basic() {
let (server_addr, server_handle) = spawn_mock_server(3, |meta| async move {
let mut out = Vec::new();
let expected_values: Vec<Vec<u16>> = vec![
vec![10, 11, 12], vec![20, 21], vec![30, 31, 32, 33], ];
for (i, (tid, slave_id, _func)) in meta.iter().enumerate() {
out.extend_from_slice(&build_fc03_response_frame(
*tid,
*slave_id,
&expected_values[i],
));
}
out
})
.await;
let mut client = ModbusTcpClient::new(server_addr, Duration::from_secs(5))
.await
.unwrap();
let requests = vec![
ModbusRequest::new_read(1, ModbusFunction::ReadHoldingRegisters, 0, 3),
ModbusRequest::new_read(1, ModbusFunction::ReadHoldingRegisters, 100, 2),
ModbusRequest::new_read(1, ModbusFunction::ReadHoldingRegisters, 200, 4),
];
let results = client
.pipeline(requests, Duration::from_secs(5))
.await
.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(
results[0].as_ref().unwrap().parse_registers().unwrap(),
vec![10, 11, 12]
);
assert_eq!(
results[1].as_ref().unwrap().parse_registers().unwrap(),
vec![20, 21]
);
assert_eq!(
results[2].as_ref().unwrap().parse_registers().unwrap(),
vec![30, 31, 32, 33]
);
server_handle.await.unwrap();
}
#[tokio::test]
async fn test_pipeline_mixed() {
let (server_addr, server_handle) = spawn_mock_server(2, |meta| async move {
let mut out = Vec::new();
let (tid0, slave0, _) = meta[0];
out.extend_from_slice(&build_fc03_response_frame(tid0, slave0, &[42, 43]));
let (tid1, slave1, _) = meta[1];
out.extend_from_slice(&build_fc06_response_frame(tid1, slave1, 200, 0x1234));
out
})
.await;
let mut client = ModbusTcpClient::new(server_addr, Duration::from_secs(5))
.await
.unwrap();
let requests = vec![
ModbusRequest::new_read(1, ModbusFunction::ReadHoldingRegisters, 0, 2),
ModbusRequest::new_write(
1,
ModbusFunction::WriteSingleRegister,
200,
vec![0x12, 0x34],
),
];
let results = client
.pipeline(requests, Duration::from_secs(5))
.await
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(
results[0].as_ref().unwrap().parse_registers().unwrap(),
vec![42, 43]
);
assert!(results[1].is_ok());
server_handle.await.unwrap();
}
#[tokio::test]
async fn test_pipeline_reads_convenience() {
let (server_addr, server_handle) = spawn_mock_server(2, |meta| async move {
let mut out = Vec::new();
let data = vec![vec![1u16, 2, 3], vec![4u16, 5]];
for (i, (tid, slave_id, _)) in meta.iter().enumerate() {
out.extend_from_slice(&build_fc03_response_frame(*tid, *slave_id, &data[i]));
}
out
})
.await;
let mut client = ModbusTcpClient::new(server_addr, Duration::from_secs(5))
.await
.unwrap();
let results = client
.pipeline_reads(1, &[(0, 3), (100, 2)], Duration::from_secs(5))
.await
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].as_ref().unwrap(), &[1, 2, 3]);
assert_eq!(results[1].as_ref().unwrap(), &[4, 5]);
server_handle.await.unwrap();
}
#[tokio::test]
async fn test_pipeline_out_of_order_responses() {
let (server_addr, server_handle) = spawn_mock_server(2, |meta| async move {
let mut out = Vec::new();
let (tid1, slave1, _) = meta[1];
out.extend_from_slice(&build_fc03_response_frame(tid1, slave1, &[200u16, 201]));
let (tid0, slave0, _) = meta[0];
out.extend_from_slice(&build_fc03_response_frame(tid0, slave0, &[100u16, 101]));
out
})
.await;
let mut client = ModbusTcpClient::new(server_addr, Duration::from_secs(5))
.await
.unwrap();
let requests = vec![
ModbusRequest::new_read(1, ModbusFunction::ReadHoldingRegisters, 0, 2),
ModbusRequest::new_read(1, ModbusFunction::ReadHoldingRegisters, 10, 2),
];
let results = client
.pipeline(requests, Duration::from_secs(5))
.await
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(
results[0].as_ref().unwrap().parse_registers().unwrap(),
vec![100u16, 101]
);
assert_eq!(
results[1].as_ref().unwrap().parse_registers().unwrap(),
vec![200u16, 201]
);
server_handle.await.unwrap();
}
}
#[cfg(all(test, feature = "rtu"))]
mod rtu_tests {
use super::*;
use std::time::Duration;
#[test]
fn test_rtu_client_creation() {
let result = ModbusRtuClient::new("/dev/ttyUSB0", 9600);
println!("RTU client creation result: {:?}", result.is_ok());
let result = ModbusRtuClient::with_config_and_logging(
"/dev/ttyUSB0",
9600,
tokio_serial::DataBits::Eight,
tokio_serial::StopBits::One,
tokio_serial::Parity::None,
Duration::from_secs(1),
None,
);
println!(
"RTU client with config creation result: {:?}",
result.is_ok()
);
}
#[tokio::test]
async fn test_rtu_client_operations() {
let client_result = ModbusRtuClient::new("/dev/ttyUSB0", 9600);
if let Ok(mut client) = client_result {
println!("RTU client connected: {}", client.is_connected());
let read_result =
tokio::time::timeout(Duration::from_millis(100), client.read_01(1, 0, 8)).await;
match read_result {
Ok(Ok(coils)) => {
println!("Successfully read {} coils", coils.len());
}
Ok(Err(e)) => {
println!("Read operation failed (expected without device): {}", e);
}
Err(_) => {
println!("Read operation timed out (expected without device)");
}
}
let _ = client.close().await;
} else {
println!("RTU client creation failed (expected without serial port)");
}
}
#[test]
fn test_rtu_client_configuration() {
let configs = vec![
("/dev/ttyUSB0", 9600),
("/dev/ttyUSB1", 19200),
("/dev/ttyS0", 38400),
("COM1", 115200),
];
for (port, baud) in configs {
let result = ModbusRtuClient::new(port, baud);
println!(
"RTU client creation for {} at {} baud: {}",
port,
baud,
result.is_ok()
);
}
}
}