Skip to main content

mbus_async/client/
network_client.rs

1//! Async Modbus TCP client.
2//!
3//! [`AsyncTcpClient`] is a thin wrapper around [`AsyncClientCore`] that adds
4//! TCP-specific constructors. All Modbus request methods are inherited
5//! transparently through the [`std::ops::Deref`] implementation that resolves
6//! to `AsyncClientCore`.
7
8use std::ops::Deref;
9use std::time::Duration;
10
11#[cfg(feature = "tcp")]
12use mbus_core::transport::ModbusTcpConfig;
13#[cfg(feature = "tcp")]
14use mbus_network::TokioTcpTransport;
15use tokio::sync::{mpsc, watch};
16
17use super::{AsyncClientCore, AsyncError};
18use crate::client::task::{ClientTask, ConnectFactory};
19
20/// Async Modbus TCP client facade.
21///
22/// All Modbus request methods (`read_holding_registers`, `write_single_coil`,
23/// etc.) are available directly on this type via [`Deref`] to
24/// [`AsyncClientCore`].
25///
26/// The constant generic parameter `N` is the compile-time pipeline depth
27/// (default `9`).
28pub struct AsyncTcpClient<const N: usize = 9> {
29    core: AsyncClientCore,
30}
31
32impl<const N: usize> Deref for AsyncTcpClient<N> {
33    type Target = AsyncClientCore;
34
35    fn deref(&self) -> &Self::Target {
36        &self.core
37    }
38}
39
40// ── Default-pipeline constructors (N = 9) ───────────────────────────────────
41
42impl AsyncTcpClient<9> {
43    /// Deprecated constructor alias.
44    ///
45    /// Use [`AsyncTcpClient::new`] and then call `client.connect().await?`.
46    #[cfg(feature = "tcp")]
47    #[deprecated(note = "use AsyncTcpClient::new(...) and then client.connect().await")]
48    pub fn connect(host: &str, port: u16) -> Result<Self, AsyncError> {
49        Self::new(host, port)
50    }
51
52    /// Deprecated constructor alias.
53    ///
54    /// Use [`AsyncTcpClient::new`] and then call `client.connect().await?`.
55    #[cfg(feature = "tcp")]
56    #[deprecated(note = "use AsyncTcpClient::new(...) and then client.connect().await")]
57    pub fn connect_with_poll_interval(
58        host: &str,
59        port: u16,
60        _poll_interval: Duration,
61    ) -> Result<Self, AsyncError> {
62        Self::new(host, port)
63    }
64
65    /// Creates an async TCP client for `host`:`port` without connecting.
66    ///
67    /// Uses the default pipeline depth of 9. Call [`AsyncClientCore::connect`]
68    /// on the returned client before sending requests.
69    #[cfg(feature = "tcp")]
70    pub fn new(host: &str, port: u16) -> Result<Self, AsyncError> {
71        Self::new_with_pipeline(host, port)
72    }
73
74    /// Creates an async TCP client for `host`:`port` with a custom
75    /// `poll_interval`.
76    ///
77    /// The poll interval is ignored in the async implementation.
78    /// Uses the default pipeline depth of 9. Call [`AsyncClientCore::connect`]
79    /// on the returned client before sending requests.
80    #[cfg(feature = "tcp")]
81    pub fn new_with_poll_interval(
82        host: &str,
83        port: u16,
84        _poll_interval: Duration,
85    ) -> Result<Self, AsyncError> {
86        Self::new(host, port)
87    }
88
89    /// Creates an async TCP client with a fully custom [`ModbusTcpConfig`],
90    /// using the default pipeline depth of 9.
91    ///
92    /// Call [`AsyncClientCore::connect`] on the returned client before sending
93    /// requests.
94    #[cfg(feature = "tcp")]
95    pub fn new_with_config(
96        tcp_config: ModbusTcpConfig,
97        _poll_interval: Duration,
98    ) -> Result<Self, AsyncError> {
99        Self::from_connect_fn(make_tcp_factory(
100            tcp_config.host.as_str().to_string(),
101            tcp_config.port,
102        ))
103    }
104}
105
106// ── Configurable-pipeline constructors ───────────────────────────────────────
107
108impl<const N: usize> AsyncTcpClient<N> {
109    /// Deprecated constructor alias.
110    ///
111    /// Use [`AsyncTcpClient::new_with_pipeline`] and then call
112    /// `client.connect().await?`.
113    #[cfg(feature = "tcp")]
114    #[deprecated(
115        note = "use AsyncTcpClient::new_with_pipeline(...) and then client.connect().await"
116    )]
117    pub fn connect_with_pipeline(host: &str, port: u16) -> Result<Self, AsyncError> {
118        Self::new_with_pipeline(host, port)
119    }
120
121    /// Deprecated constructor alias.
122    ///
123    /// Use [`AsyncTcpClient::new_with_pipeline_and_poll_interval`] and then
124    /// call `client.connect().await?`.
125    #[cfg(feature = "tcp")]
126    #[deprecated(
127        note = "use AsyncTcpClient::new_with_pipeline_and_poll_interval(...) and then client.connect().await"
128    )]
129    pub fn connect_with_pipeline_and_poll_interval(
130        host: &str,
131        port: u16,
132        _poll_interval: Duration,
133    ) -> Result<Self, AsyncError> {
134        Self::new_with_pipeline(host, port)
135    }
136
137    /// Creates an async TCP client with compile-time pipeline depth `N`.
138    ///
139    /// Call [`AsyncClientCore::connect`] on the returned client before sending
140    /// requests.
141    #[cfg(feature = "tcp")]
142    pub fn new_with_pipeline(host: &str, port: u16) -> Result<Self, AsyncError> {
143        Self::from_connect_fn(make_tcp_factory(host.to_string(), port))
144    }
145
146    /// Creates an async TCP client with compile-time pipeline depth `N` and a
147    /// custom `poll_interval`.
148    ///
149    /// The poll interval is ignored in the async implementation.
150    ///
151    /// Call [`AsyncClientCore::connect`] on the returned client before sending
152    /// requests.
153    #[cfg(feature = "tcp")]
154    pub fn new_with_pipeline_and_poll_interval(
155        host: &str,
156        port: u16,
157        _poll_interval: Duration,
158    ) -> Result<Self, AsyncError> {
159        Self::new_with_pipeline(host, port)
160    }
161
162    /// Creates an async TCP client with a fully custom config and pipeline
163    /// depth `N`.
164    ///
165    /// Call [`AsyncClientCore::connect`] on the returned client before sending
166    /// requests.
167    #[cfg(feature = "tcp")]
168    pub fn new_with_config_and_pipeline(
169        tcp_config: ModbusTcpConfig,
170        _poll_interval: Duration,
171    ) -> Result<Self, AsyncError> {
172        Self::from_connect_fn(make_tcp_factory(
173            tcp_config.host.as_str().to_string(),
174            tcp_config.port,
175        ))
176    }
177
178    /// Internal constructor: wires a `ConnectFactory` into a spawned
179    /// [`ClientTask`] and wraps the resulting channels in an
180    /// [`AsyncClientCore`].
181    #[cfg(feature = "tcp")]
182    fn from_connect_fn(connect_fn: ConnectFactory<TokioTcpTransport>) -> Result<Self, AsyncError> {
183        let handle = tokio::runtime::Handle::try_current().map_err(|_| AsyncError::WorkerClosed)?;
184        let (cmd_tx, cmd_rx) = mpsc::channel(64);
185        let (pending_count_tx, pending_count_rx) = watch::channel(0usize);
186
187        #[cfg(feature = "traffic")]
188        let notifier = crate::client::notifier::new_notifier_store();
189
190        let task = ClientTask::<TokioTcpTransport, N>::new(
191            connect_fn,
192            cmd_rx,
193            pending_count_tx,
194            #[cfg(feature = "traffic")]
195            notifier.clone(),
196        );
197        handle.spawn(task.run());
198
199        Ok(Self {
200            core: AsyncClientCore::new(
201                cmd_tx,
202                pending_count_rx,
203                #[cfg(feature = "traffic")]
204                notifier,
205            ),
206        })
207    }
208}
209
210// ── Internal helpers ─────────────────────────────────────────────────────────
211
212/// Builds a [`ConnectFactory`] that resolves a TCP connection to `host:port`.
213#[cfg(feature = "tcp")]
214fn make_tcp_factory(host: String, port: u16) -> ConnectFactory<TokioTcpTransport> {
215    Box::new(move || {
216        let h = host.clone();
217        Box::pin(async move { TokioTcpTransport::connect((h.as_str(), port)).await })
218    })
219}