use super::*;
pub struct AsyncClientCore {
sender: Sender<WorkerCommand>,
next_txn_id: AtomicU16,
#[cfg(feature = "traffic")]
traffic_handler: TrafficHandlerStore,
}
impl AsyncClientCore {
#[cfg(feature = "traffic")]
pub(super) fn new(sender: Sender<WorkerCommand>, traffic_handler: TrafficHandlerStore) -> Self {
Self {
sender,
next_txn_id: AtomicU16::new(1),
#[cfg(feature = "traffic")]
traffic_handler,
}
}
#[cfg(not(feature = "traffic"))]
pub(super) fn new(sender: Sender<WorkerCommand>) -> Self {
Self {
sender,
next_txn_id: AtomicU16::new(1),
}
}
fn next_txn_id(&self) -> u16 {
self.next_txn_id.fetch_add(1, Ordering::Relaxed)
}
async fn request_with<F>(&self, build: F) -> Result<WorkerResponse, AsyncError>
where
F: FnOnce(PendingSender) -> WorkerCommand,
{
let (sender, receiver) = oneshot::channel();
let command = build(sender);
self.sender
.send(command)
.map_err(|_| AsyncError::WorkerClosed)?;
receiver
.await
.map_err(|_| AsyncError::WorkerClosed)?
.map_err(AsyncError::from)
}
pub async fn connect(&self) -> Result<(), AsyncError> {
let response = self
.request_with(|sender| WorkerCommand::Connect { sender })
.await?;
match response {
WorkerResponse::Ack => Ok(()),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
pub async fn has_pending_requests(&self) -> Result<bool, AsyncError> {
let response = self
.request_with(|sender| WorkerCommand::HasPendingRequests { sender })
.await?;
match response {
WorkerResponse::HasPendingRequests(value) => Ok(value),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "traffic")]
pub fn set_traffic_handler<F>(&self, handler: F)
where
F: FnMut(&TrafficEvent) + Send + 'static,
{
if let Ok(mut slot) = self.traffic_handler.lock() {
*slot = Some(Box::new(handler));
}
}
#[cfg(feature = "traffic")]
pub fn clear_traffic_handler(&self) {
if let Ok(mut slot) = self.traffic_handler.lock() {
*slot = None;
}
}
#[cfg(feature = "coils")]
pub async fn read_multiple_coils(
&self,
unit_id: u8,
address: u16,
quantity: u16,
) -> Result<Coils, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReadMultipleCoils {
txn_id: self.next_txn_id(),
unit,
address,
quantity,
sender,
})
.await?;
match response {
WorkerResponse::Coils(coils) => Ok(coils),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "coils")]
pub async fn write_single_coil(
&self,
unit_id: u8,
address: u16,
value: bool,
) -> Result<(u16, bool), AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::WriteSingleCoil {
txn_id: self.next_txn_id(),
unit,
address,
value,
sender,
})
.await?;
match response {
WorkerResponse::Coils(coils) => {
let v = coils.value(coils.from_address()).unwrap_or(false);
Ok((coils.from_address(), v))
}
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "coils")]
pub async fn write_multiple_coils(
&self,
unit_id: u8,
address: u16,
coils: &Coils,
) -> Result<(u16, u16), AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::WriteMultipleCoils {
txn_id: self.next_txn_id(),
unit,
address,
coils: coils.clone(),
sender,
})
.await?;
match response {
WorkerResponse::Coils(coils) => Ok((coils.from_address(), coils.quantity())),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "registers")]
pub async fn read_holding_registers(
&self,
unit_id: u8,
address: u16,
quantity: u16,
) -> Result<Registers, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReadHoldingRegisters {
txn_id: self.next_txn_id(),
unit,
address,
quantity,
sender,
})
.await?;
match response {
WorkerResponse::Registers(registers) => Ok(registers),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "registers")]
pub async fn read_input_registers(
&self,
unit_id: u8,
address: u16,
quantity: u16,
) -> Result<Registers, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReadInputRegisters {
txn_id: self.next_txn_id(),
unit,
address,
quantity,
sender,
})
.await?;
match response {
WorkerResponse::Registers(registers) => Ok(registers),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "registers")]
pub async fn write_single_register(
&self,
unit_id: u8,
address: u16,
value: u16,
) -> Result<(u16, u16), AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::WriteSingleRegister {
txn_id: self.next_txn_id(),
unit,
address,
value,
sender,
})
.await?;
match response {
WorkerResponse::SingleRegisterWrite { address, value } => Ok((address, value)),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "registers")]
pub async fn write_multiple_registers(
&self,
unit_id: u8,
address: u16,
values: &[u16],
) -> Result<(u16, u16), AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::WriteMultipleRegisters {
txn_id: self.next_txn_id(),
unit,
address,
values: values.to_vec(),
sender,
})
.await?;
match response {
WorkerResponse::Registers(regs) => Ok((regs.from_address(), regs.quantity())),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "registers")]
pub async fn read_write_multiple_registers(
&self,
unit_id: u8,
read_address: u16,
read_quantity: u16,
write_address: u16,
write_values: &[u16],
) -> Result<Registers, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReadWriteMultipleRegisters {
txn_id: self.next_txn_id(),
unit,
read_address,
read_quantity,
write_address,
write_values: write_values.to_vec(),
sender,
})
.await?;
match response {
WorkerResponse::Registers(regs) => Ok(regs),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "registers")]
pub async fn mask_write_register(
&self,
unit_id: u8,
address: u16,
and_mask: u16,
or_mask: u16,
) -> Result<(), AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::MaskWriteRegister {
txn_id: self.next_txn_id(),
unit,
address,
and_mask,
or_mask,
sender,
})
.await?;
match response {
WorkerResponse::MaskWriteRegister => Ok(()),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "discrete-inputs")]
pub async fn read_discrete_inputs(
&self,
unit_id: u8,
address: u16,
quantity: u16,
) -> Result<DiscreteInputs, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReadDiscreteInputs {
txn_id: self.next_txn_id(),
unit,
address,
quantity,
sender,
})
.await?;
match response {
WorkerResponse::DiscreteInputs(di) => Ok(di),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "fifo")]
pub async fn read_fifo_queue(
&self,
unit_id: u8,
address: u16,
) -> Result<FifoQueue, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReadFifoQueue {
txn_id: self.next_txn_id(),
unit,
address,
sender,
})
.await?;
match response {
WorkerResponse::FifoQueue(queue) => Ok(queue),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "file-record")]
pub async fn read_file_record(
&self,
unit_id: u8,
sub_request: &SubRequest,
) -> Result<Vec<SubRequestParams>, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReadFileRecord {
txn_id: self.next_txn_id(),
unit,
sub_request: sub_request.clone(),
sender,
})
.await?;
match response {
WorkerResponse::FileRecordRead(data) => Ok(data),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "file-record")]
pub async fn write_file_record(
&self,
unit_id: u8,
sub_request: &SubRequest,
) -> Result<(), AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::WriteFileRecord {
txn_id: self.next_txn_id(),
unit,
sub_request: sub_request.clone(),
sender,
})
.await?;
match response {
WorkerResponse::FileRecordWrite => Ok(()),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "diagnostics")]
pub async fn read_device_identification(
&self,
unit_id: u8,
read_device_id_code: ReadDeviceIdCode,
object_id: ObjectId,
) -> Result<DeviceIdentificationResponse, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReadDeviceIdentification {
txn_id: self.next_txn_id(),
unit,
read_device_id_code,
object_id,
sender,
})
.await?;
match response {
WorkerResponse::DeviceIdentification(resp) => Ok(resp),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "diagnostics")]
pub async fn encapsulated_interface_transport(
&self,
unit_id: u8,
mei_type: EncapsulatedInterfaceType,
data: &[u8],
) -> Result<(EncapsulatedInterfaceType, Vec<u8>), AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::EncapsulatedInterfaceTransport {
txn_id: self.next_txn_id(),
unit,
mei_type,
data: data.to_vec(),
sender,
})
.await?;
match response {
WorkerResponse::EncapsulatedInterfaceTransport { mei_type, data } => {
Ok((mei_type, data))
}
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "diagnostics")]
pub async fn read_exception_status(&self, unit_id: u8) -> Result<u8, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReadExceptionStatus {
txn_id: self.next_txn_id(),
unit,
sender,
})
.await?;
match response {
WorkerResponse::ExceptionStatus(status) => Ok(status),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "diagnostics")]
pub async fn diagnostics(
&self,
unit_id: u8,
sub_function: DiagnosticSubFunction,
data: &[u16],
) -> Result<DiagnosticsDataResponse, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::Diagnostics {
txn_id: self.next_txn_id(),
unit,
sub_function,
data: data.to_vec(),
sender,
})
.await?;
match response {
WorkerResponse::DiagnosticsData(resp) => Ok(resp),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "diagnostics")]
pub async fn get_comm_event_counter(&self, unit_id: u8) -> Result<(u16, u16), AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::GetCommEventCounter {
txn_id: self.next_txn_id(),
unit,
sender,
})
.await?;
match response {
WorkerResponse::CommEventCounter {
status,
event_count,
} => Ok((status, event_count)),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "diagnostics")]
pub async fn get_comm_event_log(
&self,
unit_id: u8,
) -> Result<CommEventLogResponse, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::GetCommEventLog {
txn_id: self.next_txn_id(),
unit,
sender,
})
.await?;
match response {
WorkerResponse::CommEventLog(resp) => Ok(resp),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
#[cfg(feature = "diagnostics")]
pub async fn report_server_id(&self, unit_id: u8) -> Result<Vec<u8>, AsyncError> {
let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::from)?;
let response = self
.request_with(|sender| WorkerCommand::ReportServerId {
txn_id: self.next_txn_id(),
unit,
sender,
})
.await?;
match response {
WorkerResponse::ReportServerId(data) => Ok(data),
_ => Err(AsyncError::UnexpectedResponseType),
}
}
}
impl Drop for AsyncClientCore {
fn drop(&mut self) {
let _ = self.sender.send(WorkerCommand::Shutdown);
}
}