mbus_async/client/
network_client.rs1use std::ops::Deref;
9use std::time::Duration;
10
11#[cfg(feature = "tcp")]
12use mbus_core::transport::ModbusTcpConfig;
13#[cfg(feature = "tcp")]
14use mbus_network::TokioTcpTransport;
15use tokio::sync::{mpsc, watch};
16
17use super::{AsyncClientCore, AsyncError};
18use crate::client::task::{ClientTask, ConnectFactory};
19
20pub struct AsyncTcpClient<const N: usize = 9> {
29 core: AsyncClientCore,
30}
31
32impl<const N: usize> Deref for AsyncTcpClient<N> {
33 type Target = AsyncClientCore;
34
35 fn deref(&self) -> &Self::Target {
36 &self.core
37 }
38}
39
40impl AsyncTcpClient<9> {
43 #[cfg(feature = "tcp")]
47 #[deprecated(note = "use AsyncTcpClient::new(...) and then client.connect().await")]
48 pub fn connect(host: &str, port: u16) -> Result<Self, AsyncError> {
49 Self::new(host, port)
50 }
51
52 #[cfg(feature = "tcp")]
56 #[deprecated(note = "use AsyncTcpClient::new(...) and then client.connect().await")]
57 pub fn connect_with_poll_interval(
58 host: &str,
59 port: u16,
60 _poll_interval: Duration,
61 ) -> Result<Self, AsyncError> {
62 Self::new(host, port)
63 }
64
65 #[cfg(feature = "tcp")]
70 pub fn new(host: &str, port: u16) -> Result<Self, AsyncError> {
71 Self::new_with_pipeline(host, port)
72 }
73
74 #[cfg(feature = "tcp")]
81 pub fn new_with_poll_interval(
82 host: &str,
83 port: u16,
84 _poll_interval: Duration,
85 ) -> Result<Self, AsyncError> {
86 Self::new(host, port)
87 }
88
89 #[cfg(feature = "tcp")]
95 pub fn new_with_config(
96 tcp_config: ModbusTcpConfig,
97 _poll_interval: Duration,
98 ) -> Result<Self, AsyncError> {
99 Self::from_connect_fn(make_tcp_factory(
100 tcp_config.host.as_str().to_string(),
101 tcp_config.port,
102 ))
103 }
104}
105
106impl<const N: usize> AsyncTcpClient<N> {
109 #[cfg(feature = "tcp")]
114 #[deprecated(
115 note = "use AsyncTcpClient::new_with_pipeline(...) and then client.connect().await"
116 )]
117 pub fn connect_with_pipeline(host: &str, port: u16) -> Result<Self, AsyncError> {
118 Self::new_with_pipeline(host, port)
119 }
120
121 #[cfg(feature = "tcp")]
126 #[deprecated(
127 note = "use AsyncTcpClient::new_with_pipeline_and_poll_interval(...) and then client.connect().await"
128 )]
129 pub fn connect_with_pipeline_and_poll_interval(
130 host: &str,
131 port: u16,
132 _poll_interval: Duration,
133 ) -> Result<Self, AsyncError> {
134 Self::new_with_pipeline(host, port)
135 }
136
137 #[cfg(feature = "tcp")]
142 pub fn new_with_pipeline(host: &str, port: u16) -> Result<Self, AsyncError> {
143 Self::from_connect_fn(make_tcp_factory(host.to_string(), port))
144 }
145
146 #[cfg(feature = "tcp")]
154 pub fn new_with_pipeline_and_poll_interval(
155 host: &str,
156 port: u16,
157 _poll_interval: Duration,
158 ) -> Result<Self, AsyncError> {
159 Self::new_with_pipeline(host, port)
160 }
161
162 #[cfg(feature = "tcp")]
168 pub fn new_with_config_and_pipeline(
169 tcp_config: ModbusTcpConfig,
170 _poll_interval: Duration,
171 ) -> Result<Self, AsyncError> {
172 Self::from_connect_fn(make_tcp_factory(
173 tcp_config.host.as_str().to_string(),
174 tcp_config.port,
175 ))
176 }
177
178 #[cfg(feature = "tcp")]
182 fn from_connect_fn(connect_fn: ConnectFactory<TokioTcpTransport>) -> Result<Self, AsyncError> {
183 let handle = tokio::runtime::Handle::try_current().map_err(|_| AsyncError::WorkerClosed)?;
184 let (cmd_tx, cmd_rx) = mpsc::channel(64);
185 let (pending_count_tx, pending_count_rx) = watch::channel(0usize);
186
187 #[cfg(feature = "traffic")]
188 let notifier = crate::client::notifier::new_notifier_store();
189
190 let task = ClientTask::<TokioTcpTransport, N>::new(
191 connect_fn,
192 cmd_rx,
193 pending_count_tx,
194 #[cfg(feature = "traffic")]
195 notifier.clone(),
196 );
197 handle.spawn(task.run());
198
199 Ok(Self {
200 core: AsyncClientCore::new(
201 cmd_tx,
202 pending_count_rx,
203 #[cfg(feature = "traffic")]
204 notifier,
205 ),
206 })
207 }
208}
209
210#[cfg(feature = "tcp")]
214fn make_tcp_factory(host: String, port: u16) -> ConnectFactory<TokioTcpTransport> {
215 Box::new(move || {
216 let h = host.clone();
217 Box::pin(async move { TokioTcpTransport::connect((h.as_str(), port)).await })
218 })
219}