mbus_async/client/
serial_client.rs1use std::ops::Deref;
14use std::sync::Arc;
15use std::time::Duration;
16
17use mbus_core::errors::MbusError;
18use mbus_core::transport::{AsyncTransport, ModbusConfig};
19#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
20use mbus_core::transport::{ModbusSerialConfig, SerialMode};
21#[cfg(feature = "serial-ascii")]
22use mbus_serial::TokioAsciiTransport;
23#[cfg(feature = "serial-rtu")]
24use mbus_serial::TokioRtuTransport;
25use tokio::sync::{mpsc, watch};
26
27use super::{AsyncClientCore, AsyncError};
28use crate::client::task::{ClientTask, ConnectFactory};
29
30pub struct AsyncSerialClient {
36 core: AsyncClientCore,
37}
38
39impl Deref for AsyncSerialClient {
40 type Target = AsyncClientCore;
41
42 fn deref(&self) -> &Self::Target {
43 &self.core
44 }
45}
46
47impl AsyncSerialClient {
50 #[cfg(feature = "serial-rtu")]
54 #[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
55 pub fn connect_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
56 Self::new_rtu(serial_config)
57 }
58
59 #[cfg(feature = "serial-rtu")]
61 #[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
62 pub fn connect_rtu_with_poll_interval(
63 serial_config: ModbusSerialConfig,
64 _poll_interval: Duration,
65 ) -> Result<Self, AsyncError> {
66 Self::new_rtu(serial_config)
67 }
68
69 #[cfg(feature = "serial-ascii")]
71 #[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
72 pub fn connect_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
73 Self::new_ascii(serial_config)
74 }
75
76 #[cfg(feature = "serial-ascii")]
78 #[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
79 pub fn connect_ascii_with_poll_interval(
80 serial_config: ModbusSerialConfig,
81 _poll_interval: Duration,
82 ) -> Result<Self, AsyncError> {
83 Self::new_ascii(serial_config)
84 }
85
86 #[deprecated(
88 note = "use AsyncSerialClient::new_with_transport(...) and then client.connect().await"
89 )]
90 pub fn connect_with_transport<T>(
91 transport: T,
92 config: ModbusConfig,
93 poll_interval: Duration,
94 ) -> Result<Self, AsyncError>
95 where
96 T: AsyncTransport + Send + 'static,
97 {
98 Self::new_with_transport(transport, config, poll_interval)
99 }
100
101 #[cfg(feature = "serial-rtu")]
107 pub fn new_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
108 if serial_config.mode != SerialMode::Rtu {
109 return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
110 }
111 make_rtu_client(ModbusConfig::Serial(serial_config))
112 }
113
114 #[cfg(feature = "serial-rtu")]
118 pub fn new_rtu_with_poll_interval(
119 serial_config: ModbusSerialConfig,
120 _poll_interval: Duration,
121 ) -> Result<Self, AsyncError> {
122 Self::new_rtu(serial_config)
123 }
124
125 #[cfg(feature = "serial-ascii")]
131 pub fn new_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
132 if serial_config.mode != SerialMode::Ascii {
133 return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
134 }
135 make_ascii_client(ModbusConfig::Serial(serial_config))
136 }
137
138 #[cfg(feature = "serial-ascii")]
142 pub fn new_ascii_with_poll_interval(
143 serial_config: ModbusSerialConfig,
144 _poll_interval: Duration,
145 ) -> Result<Self, AsyncError> {
146 Self::new_ascii(serial_config)
147 }
148
149 pub fn new_with_transport<T>(
159 transport: T,
160 config: ModbusConfig,
161 _poll_interval: Duration,
162 ) -> Result<Self, AsyncError>
163 where
164 T: AsyncTransport + Send + 'static,
165 {
166 if !matches!(config, ModbusConfig::Serial(_)) {
167 return Err(AsyncError::Mbus(MbusError::InvalidTransport));
168 }
169
170 let slot = Arc::new(std::sync::Mutex::new(Some(transport)));
173 let connect_fn: ConnectFactory<T> = Box::new(move || {
174 let s = slot.clone();
175 Box::pin(async move { s.lock().unwrap().take().ok_or(MbusError::ConnectionClosed) })
176 });
177
178 spawn_serial_task(connect_fn)
179 }
180}
181
182fn spawn_serial_task<T: AsyncTransport + Send + 'static>(
187 connect_fn: ConnectFactory<T>,
188) -> Result<AsyncSerialClient, AsyncError> {
189 let handle = tokio::runtime::Handle::try_current().map_err(|_| AsyncError::WorkerClosed)?;
190 let (cmd_tx, cmd_rx) = mpsc::channel(64);
191 let (pending_count_tx, pending_count_rx) = watch::channel(0usize);
192
193 #[cfg(feature = "traffic")]
194 let notifier = crate::client::notifier::new_notifier_store();
195
196 let task = ClientTask::<T, 1>::new(
197 connect_fn,
198 cmd_rx,
199 pending_count_tx,
200 #[cfg(feature = "traffic")]
201 notifier.clone(),
202 );
203 handle.spawn(task.run());
204
205 Ok(AsyncSerialClient {
206 core: AsyncClientCore::new(
207 cmd_tx,
208 pending_count_rx,
209 #[cfg(feature = "traffic")]
210 notifier,
211 ),
212 })
213}
214
215#[cfg(feature = "serial-rtu")]
218fn make_rtu_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioRtuTransport> {
219 Box::new(move || {
220 let cfg = config.clone();
221 Box::pin(async move { TokioRtuTransport::new(&cfg) })
222 })
223}
224
225#[cfg(feature = "serial-ascii")]
228fn make_ascii_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioAsciiTransport> {
229 Box::new(move || {
230 let cfg = config.clone();
231 Box::pin(async move { TokioAsciiTransport::new(&cfg) })
232 })
233}
234
235#[cfg(feature = "serial-rtu")]
237fn make_rtu_client(config: ModbusConfig) -> Result<AsyncSerialClient, AsyncError> {
238 spawn_serial_task(make_rtu_factory(Arc::new(config)))
239}
240
241#[cfg(feature = "serial-ascii")]
243fn make_ascii_client(config: ModbusConfig) -> Result<AsyncSerialClient, AsyncError> {
244 spawn_serial_task(make_ascii_factory(Arc::new(config)))
245}