use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use mbus_core::errors::MbusError;
use mbus_core::transport::{AsyncTransport, ModbusConfig};
#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
use mbus_core::transport::{ModbusSerialConfig, SerialMode};
#[cfg(feature = "serial-ascii")]
use mbus_serial::TokioAsciiTransport;
#[cfg(feature = "serial-rtu")]
use mbus_serial::TokioRtuTransport;
use tokio::sync::{mpsc, watch};
use super::{AsyncClientCore, AsyncError};
use crate::client::task::{ClientTask, ConnectFactory};
pub struct AsyncSerialClient {
core: AsyncClientCore,
}
impl Deref for AsyncSerialClient {
type Target = AsyncClientCore;
fn deref(&self) -> &Self::Target {
&self.core
}
}
impl AsyncSerialClient {
#[cfg(feature = "serial-rtu")]
#[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
pub fn connect_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
Self::new_rtu(serial_config)
}
#[cfg(feature = "serial-rtu")]
#[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
pub fn connect_rtu_with_poll_interval(
serial_config: ModbusSerialConfig,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::new_rtu(serial_config)
}
#[cfg(feature = "serial-ascii")]
#[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
pub fn connect_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
Self::new_ascii(serial_config)
}
#[cfg(feature = "serial-ascii")]
#[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
pub fn connect_ascii_with_poll_interval(
serial_config: ModbusSerialConfig,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::new_ascii(serial_config)
}
#[deprecated(
note = "use AsyncSerialClient::new_with_transport(...) and then client.connect().await"
)]
pub fn connect_with_transport<T>(
transport: T,
config: ModbusConfig,
poll_interval: Duration,
) -> Result<Self, AsyncError>
where
T: AsyncTransport + Send + 'static,
{
Self::new_with_transport(transport, config, poll_interval)
}
#[cfg(feature = "serial-rtu")]
pub fn new_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
if serial_config.mode != SerialMode::Rtu {
return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
}
make_rtu_client(ModbusConfig::Serial(serial_config))
}
#[cfg(feature = "serial-rtu")]
pub fn new_rtu_with_poll_interval(
serial_config: ModbusSerialConfig,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::new_rtu(serial_config)
}
#[cfg(feature = "serial-ascii")]
pub fn new_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
if serial_config.mode != SerialMode::Ascii {
return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
}
make_ascii_client(ModbusConfig::Serial(serial_config))
}
#[cfg(feature = "serial-ascii")]
pub fn new_ascii_with_poll_interval(
serial_config: ModbusSerialConfig,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::new_ascii(serial_config)
}
pub fn new_with_transport<T>(
transport: T,
config: ModbusConfig,
_poll_interval: Duration,
) -> Result<Self, AsyncError>
where
T: AsyncTransport + Send + 'static,
{
if !matches!(config, ModbusConfig::Serial(_)) {
return Err(AsyncError::Mbus(MbusError::InvalidTransport));
}
let slot = Arc::new(std::sync::Mutex::new(Some(transport)));
let connect_fn: ConnectFactory<T> = Box::new(move || {
let s = slot.clone();
Box::pin(async move { s.lock().unwrap().take().ok_or(MbusError::ConnectionClosed) })
});
spawn_serial_task(connect_fn)
}
}
fn spawn_serial_task<T: AsyncTransport + Send + 'static>(
connect_fn: ConnectFactory<T>,
) -> Result<AsyncSerialClient, AsyncError> {
let handle = tokio::runtime::Handle::try_current().map_err(|_| AsyncError::WorkerClosed)?;
let (cmd_tx, cmd_rx) = mpsc::channel(64);
let (pending_count_tx, pending_count_rx) = watch::channel(0usize);
#[cfg(feature = "traffic")]
let notifier = crate::client::notifier::new_notifier_store();
let task = ClientTask::<T, 1>::new(
connect_fn,
cmd_rx,
pending_count_tx,
#[cfg(feature = "traffic")]
notifier.clone(),
);
handle.spawn(task.run());
Ok(AsyncSerialClient {
core: AsyncClientCore::new(
cmd_tx,
pending_count_rx,
#[cfg(feature = "traffic")]
notifier,
),
})
}
#[cfg(feature = "serial-rtu")]
fn make_rtu_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioRtuTransport> {
Box::new(move || {
let cfg = config.clone();
Box::pin(async move { TokioRtuTransport::new(&cfg) })
})
}
#[cfg(feature = "serial-ascii")]
fn make_ascii_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioAsciiTransport> {
Box::new(move || {
let cfg = config.clone();
Box::pin(async move { TokioAsciiTransport::new(&cfg) })
})
}
#[cfg(feature = "serial-rtu")]
fn make_rtu_client(config: ModbusConfig) -> Result<AsyncSerialClient, AsyncError> {
spawn_serial_task(make_rtu_factory(Arc::new(config)))
}
#[cfg(feature = "serial-ascii")]
fn make_ascii_client(config: ModbusConfig) -> Result<AsyncSerialClient, AsyncError> {
spawn_serial_task(make_ascii_factory(Arc::new(config)))
}