mbus_async/client/
serial_client.rs1use std::ops::Deref;
14use std::sync::Arc;
15use std::time::Duration;
16
17use mbus_core::errors::MbusError;
18use mbus_core::transport::{AsyncTransport, ModbusConfig};
19#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
20use mbus_core::transport::{ModbusSerialConfig, SerialMode};
21#[cfg(feature = "serial-ascii")]
22use mbus_serial::TokioAsciiTransport;
23#[cfg(feature = "serial-rtu")]
24use mbus_serial::TokioRtuTransport;
25use tokio::sync::{mpsc, watch};
26
27use super::{AsyncClientCore, AsyncError};
28use crate::client::task::{ClientTask, ConnectFactory};
29
30pub struct AsyncSerialClient<const ASCII: bool = false> {
36 core: AsyncClientCore,
37}
38
39impl<const ASCII: bool> Deref for AsyncSerialClient<ASCII> {
40 type Target = AsyncClientCore;
41
42 fn deref(&self) -> &Self::Target {
43 &self.core
44 }
45}
46
47impl AsyncSerialClient<false> {
50 #[cfg(feature = "serial-rtu")]
54 #[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
55 pub fn connect_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
56 Self::new_rtu(serial_config)
57 }
58
59 #[cfg(feature = "serial-rtu")]
61 #[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
62 pub fn connect_rtu_with_poll_interval(
63 serial_config: ModbusSerialConfig,
64 _poll_interval: Duration,
65 ) -> Result<Self, AsyncError> {
66 Self::new_rtu(serial_config)
67 }
68
69 #[cfg(feature = "serial-rtu")]
75 pub fn new_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
76 if serial_config.mode != SerialMode::Rtu {
77 return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
78 }
79 make_rtu_client(ModbusConfig::Serial(serial_config))
80 }
81
82 #[cfg(feature = "serial-rtu")]
86 pub fn new_rtu_with_poll_interval(
87 serial_config: ModbusSerialConfig,
88 _poll_interval: Duration,
89 ) -> Result<Self, AsyncError> {
90 Self::new_rtu(serial_config)
91 }
92}
93
94impl AsyncSerialClient<true> {
95 #[cfg(feature = "serial-ascii")]
97 #[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
98 pub fn connect_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
99 Self::new_ascii(serial_config)
100 }
101
102 #[cfg(feature = "serial-ascii")]
104 #[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
105 pub fn connect_ascii_with_poll_interval(
106 serial_config: ModbusSerialConfig,
107 _poll_interval: Duration,
108 ) -> Result<Self, AsyncError> {
109 Self::new_ascii(serial_config)
110 }
111
112 #[cfg(feature = "serial-ascii")]
118 pub fn new_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
119 if serial_config.mode != SerialMode::Ascii {
120 return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
121 }
122 make_ascii_client(ModbusConfig::Serial(serial_config))
123 }
124
125 #[cfg(feature = "serial-ascii")]
129 pub fn new_ascii_with_poll_interval(
130 serial_config: ModbusSerialConfig,
131 _poll_interval: Duration,
132 ) -> Result<Self, AsyncError> {
133 Self::new_ascii(serial_config)
134 }
135}
136
137impl<const ASCII: bool> AsyncSerialClient<ASCII> {
138 #[deprecated(
140 note = "use AsyncSerialClient::new_with_transport(...) and then client.connect().await"
141 )]
142 pub fn connect_with_transport<T>(
143 transport: T,
144 config: ModbusConfig,
145 poll_interval: Duration,
146 ) -> Result<Self, AsyncError>
147 where
148 T: AsyncTransport + Send + 'static,
149 {
150 Self::new_with_transport(transport, config, poll_interval)
151 }
152
153 pub fn new_with_transport<T>(
163 transport: T,
164 config: ModbusConfig,
165 _poll_interval: Duration,
166 ) -> Result<Self, AsyncError>
167 where
168 T: AsyncTransport + Send + 'static,
169 {
170 if !matches!(config, ModbusConfig::Serial(_)) {
171 return Err(AsyncError::Mbus(MbusError::InvalidTransport));
172 }
173
174 let slot = Arc::new(std::sync::Mutex::new(Some(transport)));
177 let connect_fn: ConnectFactory<T> = Box::new(move || {
178 let s = slot.clone();
179 Box::pin(async move { s.lock().unwrap().take().ok_or(MbusError::ConnectionClosed) })
180 });
181
182 spawn_serial_task(connect_fn)
183 }
184}
185
186fn spawn_serial_task<T: AsyncTransport + Send + 'static, const ASCII: bool>(
191 connect_fn: ConnectFactory<T>,
192) -> Result<AsyncSerialClient<ASCII>, AsyncError> {
193 let handle = tokio::runtime::Handle::try_current().map_err(|_| AsyncError::WorkerClosed)?;
194 let (cmd_tx, cmd_rx) = mpsc::channel(64);
195 let (pending_count_tx, pending_count_rx) = watch::channel(0usize);
196
197 #[cfg(feature = "traffic")]
198 let notifier = crate::client::notifier::new_notifier_store();
199
200 let task = ClientTask::<T, 1>::new(
201 connect_fn,
202 cmd_rx,
203 pending_count_tx,
204 #[cfg(feature = "traffic")]
205 notifier.clone(),
206 );
207 handle.spawn(task.run());
208
209 Ok(AsyncSerialClient {
210 core: AsyncClientCore::new(
211 cmd_tx,
212 pending_count_rx,
213 #[cfg(feature = "traffic")]
214 notifier,
215 ),
216 })
217}
218
219#[cfg(feature = "serial-rtu")]
222fn make_rtu_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioRtuTransport> {
223 Box::new(move || {
224 let cfg = config.clone();
225 Box::pin(async move { TokioRtuTransport::new(&cfg) })
226 })
227}
228
229#[cfg(feature = "serial-ascii")]
232fn make_ascii_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioAsciiTransport> {
233 Box::new(move || {
234 let cfg = config.clone();
235 Box::pin(async move { TokioAsciiTransport::new(&cfg) })
236 })
237}
238
239#[cfg(feature = "serial-rtu")]
241fn make_rtu_client(config: ModbusConfig) -> Result<AsyncSerialClient<false>, AsyncError> {
242 spawn_serial_task(make_rtu_factory(Arc::new(config)))
243}
244
245#[cfg(feature = "serial-ascii")]
247fn make_ascii_client(config: ModbusConfig) -> Result<AsyncSerialClient<true>, AsyncError> {
248 spawn_serial_task(make_ascii_factory(Arc::new(config)))
249}
250
251pub type AsyncRtuClient = AsyncSerialClient<false>;
253pub type AsyncAsciiClient = AsyncSerialClient<true>;
255
256#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
261pub enum AsyncSerialClientKind {
262 #[cfg(feature = "serial-rtu")]
264 Rtu(AsyncRtuClient),
265 #[cfg(feature = "serial-ascii")]
267 Ascii(AsyncAsciiClient),
268}
269
270#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
271impl Deref for AsyncSerialClientKind {
272 type Target = AsyncClientCore;
273
274 fn deref(&self) -> &Self::Target {
275 match self {
276 #[cfg(feature = "serial-rtu")]
277 Self::Rtu(client) => &client.core,
278 #[cfg(feature = "serial-ascii")]
279 Self::Ascii(client) => &client.core,
280 }
281 }
282}