mbus_async/client/
network_client.rs1use std::ops::Deref;
9#[cfg(feature = "network-tcp")]
10use std::time::Duration;
11
12#[cfg(feature = "network-tcp")]
13use mbus_core::transport::ModbusTcpConfig;
14#[cfg(feature = "network-tcp")]
15use mbus_network::TokioTcpTransport;
16#[cfg(feature = "network-tcp")]
17use tokio::sync::{mpsc, watch};
18
19use super::AsyncClientCore;
20#[cfg(feature = "network-tcp")]
21use super::AsyncError;
22#[cfg(feature = "network-tcp")]
23use crate::client::task::{ClientTask, ConnectFactory};
24
25pub struct AsyncTcpClient<const N: usize = 9> {
34 core: AsyncClientCore,
35}
36
37impl<const N: usize> Deref for AsyncTcpClient<N> {
38 type Target = AsyncClientCore;
39
40 fn deref(&self) -> &Self::Target {
41 &self.core
42 }
43}
44
45impl AsyncTcpClient<9> {
48 #[cfg(feature = "network-tcp")]
52 #[deprecated(note = "use AsyncTcpClient::new(...) and then client.connect().await")]
53 pub fn connect(host: &str, port: u16) -> Result<Self, AsyncError> {
54 Self::new(host, port)
55 }
56
57 #[cfg(feature = "network-tcp")]
61 #[deprecated(note = "use AsyncTcpClient::new(...) and then client.connect().await")]
62 pub fn connect_with_poll_interval(
63 host: &str,
64 port: u16,
65 _poll_interval: Duration,
66 ) -> Result<Self, AsyncError> {
67 Self::new(host, port)
68 }
69
70 #[cfg(feature = "network-tcp")]
75 pub fn new(host: &str, port: u16) -> Result<Self, AsyncError> {
76 Self::new_with_pipeline(host, port)
77 }
78
79 #[cfg(feature = "network-tcp")]
86 pub fn new_with_poll_interval(
87 host: &str,
88 port: u16,
89 _poll_interval: Duration,
90 ) -> Result<Self, AsyncError> {
91 Self::new(host, port)
92 }
93
94 #[cfg(feature = "network-tcp")]
100 pub fn new_with_config(
101 tcp_config: ModbusTcpConfig,
102 _poll_interval: Duration,
103 ) -> Result<Self, AsyncError> {
104 Self::from_connect_fn(make_tcp_factory(
105 tcp_config.host.as_str().to_string(),
106 tcp_config.port,
107 ))
108 }
109}
110
111impl<const N: usize> AsyncTcpClient<N> {
114 #[cfg(feature = "network-tcp")]
119 #[deprecated(
120 note = "use AsyncTcpClient::new_with_pipeline(...) and then client.connect().await"
121 )]
122 pub fn connect_with_pipeline(host: &str, port: u16) -> Result<Self, AsyncError> {
123 Self::new_with_pipeline(host, port)
124 }
125
126 #[cfg(feature = "network-tcp")]
131 #[deprecated(
132 note = "use AsyncTcpClient::new_with_pipeline_and_poll_interval(...) and then client.connect().await"
133 )]
134 pub fn connect_with_pipeline_and_poll_interval(
135 host: &str,
136 port: u16,
137 _poll_interval: Duration,
138 ) -> Result<Self, AsyncError> {
139 Self::new_with_pipeline(host, port)
140 }
141
142 #[cfg(feature = "network-tcp")]
147 pub fn new_with_pipeline(host: &str, port: u16) -> Result<Self, AsyncError> {
148 Self::from_connect_fn(make_tcp_factory(host.to_string(), port))
149 }
150
151 #[cfg(feature = "network-tcp")]
159 pub fn new_with_pipeline_and_poll_interval(
160 host: &str,
161 port: u16,
162 _poll_interval: Duration,
163 ) -> Result<Self, AsyncError> {
164 Self::new_with_pipeline(host, port)
165 }
166
167 #[cfg(feature = "network-tcp")]
173 pub fn new_with_config_and_pipeline(
174 tcp_config: ModbusTcpConfig,
175 _poll_interval: Duration,
176 ) -> Result<Self, AsyncError> {
177 Self::from_connect_fn(make_tcp_factory(
178 tcp_config.host.as_str().to_string(),
179 tcp_config.port,
180 ))
181 }
182
183 #[cfg(feature = "network-tcp")]
187 fn from_connect_fn(connect_fn: ConnectFactory<TokioTcpTransport>) -> Result<Self, AsyncError> {
188 let handle = tokio::runtime::Handle::try_current().map_err(|_| AsyncError::WorkerClosed)?;
189 let (cmd_tx, cmd_rx) = mpsc::channel(64);
190 let (pending_count_tx, pending_count_rx) = watch::channel(0usize);
191
192 #[cfg(feature = "traffic")]
193 let notifier = crate::client::notifier::new_notifier_store();
194
195 let task = ClientTask::<TokioTcpTransport, N>::new(
196 connect_fn,
197 cmd_rx,
198 pending_count_tx,
199 #[cfg(feature = "traffic")]
200 notifier.clone(),
201 );
202 handle.spawn(task.run());
203
204 Ok(Self {
205 core: AsyncClientCore::new(
206 cmd_tx,
207 pending_count_rx,
208 #[cfg(feature = "traffic")]
209 notifier,
210 ),
211 })
212 }
213}
214
215#[cfg(feature = "network-tcp")]
219fn make_tcp_factory(host: String, port: u16) -> ConnectFactory<TokioTcpTransport> {
220 Box::new(move || {
221 let h = host.clone();
222 Box::pin(async move { TokioTcpTransport::connect((h.as_str(), port)).await })
223 })
224}