use std::ops::Deref;
#[cfg(feature = "network-tcp")]
use std::time::Duration;
#[cfg(feature = "network-tcp")]
use mbus_core::transport::ModbusTcpConfig;
#[cfg(feature = "network-tcp")]
use mbus_network::TokioTcpTransport;
#[cfg(feature = "network-tcp")]
use tokio::sync::{mpsc, watch};
use super::AsyncClientCore;
#[cfg(feature = "network-tcp")]
use super::AsyncError;
#[cfg(feature = "network-tcp")]
use crate::client::task::{ClientTask, ConnectFactory};
pub struct AsyncTcpClient<const N: usize = 9> {
core: AsyncClientCore,
}
impl<const N: usize> Deref for AsyncTcpClient<N> {
type Target = AsyncClientCore;
fn deref(&self) -> &Self::Target {
&self.core
}
}
impl AsyncTcpClient<9> {
#[cfg(feature = "network-tcp")]
#[deprecated(note = "use AsyncTcpClient::new(...) and then client.connect().await")]
pub fn connect(host: &str, port: u16) -> Result<Self, AsyncError> {
Self::new(host, port)
}
#[cfg(feature = "network-tcp")]
#[deprecated(note = "use AsyncTcpClient::new(...) and then client.connect().await")]
pub fn connect_with_poll_interval(
host: &str,
port: u16,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::new(host, port)
}
#[cfg(feature = "network-tcp")]
pub fn new(host: &str, port: u16) -> Result<Self, AsyncError> {
Self::new_with_pipeline(host, port)
}
#[cfg(feature = "network-tcp")]
pub fn new_with_poll_interval(
host: &str,
port: u16,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::new(host, port)
}
#[cfg(feature = "network-tcp")]
pub fn new_with_config(
tcp_config: ModbusTcpConfig,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::from_connect_fn(make_tcp_factory(
tcp_config.host.as_str().to_string(),
tcp_config.port,
))
}
}
impl<const N: usize> AsyncTcpClient<N> {
#[cfg(feature = "network-tcp")]
#[deprecated(
note = "use AsyncTcpClient::new_with_pipeline(...) and then client.connect().await"
)]
pub fn connect_with_pipeline(host: &str, port: u16) -> Result<Self, AsyncError> {
Self::new_with_pipeline(host, port)
}
#[cfg(feature = "network-tcp")]
#[deprecated(
note = "use AsyncTcpClient::new_with_pipeline_and_poll_interval(...) and then client.connect().await"
)]
pub fn connect_with_pipeline_and_poll_interval(
host: &str,
port: u16,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::new_with_pipeline(host, port)
}
#[cfg(feature = "network-tcp")]
pub fn new_with_pipeline(host: &str, port: u16) -> Result<Self, AsyncError> {
Self::from_connect_fn(make_tcp_factory(host.to_string(), port))
}
#[cfg(feature = "network-tcp")]
pub fn new_with_pipeline_and_poll_interval(
host: &str,
port: u16,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::new_with_pipeline(host, port)
}
#[cfg(feature = "network-tcp")]
pub fn new_with_config_and_pipeline(
tcp_config: ModbusTcpConfig,
_poll_interval: Duration,
) -> Result<Self, AsyncError> {
Self::from_connect_fn(make_tcp_factory(
tcp_config.host.as_str().to_string(),
tcp_config.port,
))
}
#[cfg(feature = "network-tcp")]
fn from_connect_fn(connect_fn: ConnectFactory<TokioTcpTransport>) -> Result<Self, AsyncError> {
let handle = tokio::runtime::Handle::try_current().map_err(|_| AsyncError::WorkerClosed)?;
let (cmd_tx, cmd_rx) = mpsc::channel(64);
let (pending_count_tx, pending_count_rx) = watch::channel(0usize);
#[cfg(feature = "traffic")]
let notifier = crate::client::notifier::new_notifier_store();
let task = ClientTask::<TokioTcpTransport, N>::new(
connect_fn,
cmd_rx,
pending_count_tx,
#[cfg(feature = "traffic")]
notifier.clone(),
);
handle.spawn(task.run());
Ok(Self {
core: AsyncClientCore::new(
cmd_tx,
pending_count_rx,
#[cfg(feature = "traffic")]
notifier,
),
})
}
}
#[cfg(feature = "network-tcp")]
fn make_tcp_factory(host: String, port: u16) -> ConnectFactory<TokioTcpTransport> {
Box::new(move || {
let h = host.clone();
Box::pin(async move { TokioTcpTransport::connect((h.as_str(), port)).await })
})
}