Skip to main content

mbus_async/client/
serial_client.rs

1//! Async Modbus serial client.
2//!
3//! [`AsyncSerialClient`] is a thin wrapper around [`AsyncClientCore`] that adds
4//! serial-specific constructors (RTU, ASCII, and injection of a custom
5//! transport).  All Modbus request methods are inherited transparently through
6//! the [`std::ops::Deref`] implementation that resolves to `AsyncClientCore`.
7//!
8//! # Note on pipeline depth
9//!
10//! Serial Modbus is a strict request-reply protocol, so the background task is
11//! always built with a pipeline depth of 1 (`ClientTask::<_, 1>`).
12
13use std::ops::Deref;
14use std::sync::Arc;
15use std::time::Duration;
16
17use mbus_core::errors::MbusError;
18use mbus_core::transport::{AsyncTransport, ModbusConfig};
19#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
20use mbus_core::transport::{ModbusSerialConfig, SerialMode};
21#[cfg(feature = "serial-ascii")]
22use mbus_serial::TokioAsciiTransport;
23#[cfg(feature = "serial-rtu")]
24use mbus_serial::TokioRtuTransport;
25use tokio::sync::{mpsc, watch};
26
27use super::{AsyncClientCore, AsyncError};
28use crate::client::task::{ClientTask, ConnectFactory};
29
30/// Async Modbus serial client facade.
31///
32/// Supports both RTU and ASCII framing.  All Modbus request methods
33/// (`read_holding_registers`, `write_single_coil`, etc.) are available directly
34/// on this type via [`Deref`] to [`AsyncClientCore`].
35pub struct AsyncSerialClient<const ASCII: bool = false> {
36    core: AsyncClientCore,
37}
38
39impl<const ASCII: bool> Deref for AsyncSerialClient<ASCII> {
40    type Target = AsyncClientCore;
41
42    fn deref(&self) -> &Self::Target {
43        &self.core
44    }
45}
46
47// ── Constructors ──────────────────────────────────────────────────────────────
48
49impl AsyncSerialClient<false> {
50    /// Deprecated constructor alias.
51    ///
52    /// Use [`AsyncSerialClient::new_rtu`] and then call `client.connect().await?`.
53    #[cfg(feature = "serial-rtu")]
54    #[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
55    pub fn connect_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
56        Self::new_rtu(serial_config)
57    }
58
59    /// Deprecated constructor alias.
60    #[cfg(feature = "serial-rtu")]
61    #[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
62    pub fn connect_rtu_with_poll_interval(
63        serial_config: ModbusSerialConfig,
64        _poll_interval: Duration,
65    ) -> Result<Self, AsyncError> {
66        Self::new_rtu(serial_config)
67    }
68
69    /// Creates an async Modbus RTU serial client without connecting.
70    ///
71    /// Validates that `serial_config.mode` is [`SerialMode::Rtu`].
72    /// Call [`AsyncClientCore::connect`] on the returned client before sending
73    /// requests.
74    #[cfg(feature = "serial-rtu")]
75    pub fn new_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
76        if serial_config.mode != SerialMode::Rtu {
77            return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
78        }
79        make_rtu_client(ModbusConfig::Serial(serial_config))
80    }
81
82    /// Creates an async Modbus RTU serial client with a custom `poll_interval`.
83    ///
84    /// The poll interval is ignored in the async implementation.
85    #[cfg(feature = "serial-rtu")]
86    pub fn new_rtu_with_poll_interval(
87        serial_config: ModbusSerialConfig,
88        _poll_interval: Duration,
89    ) -> Result<Self, AsyncError> {
90        Self::new_rtu(serial_config)
91    }
92}
93
94impl AsyncSerialClient<true> {
95    /// Deprecated constructor alias.
96    #[cfg(feature = "serial-ascii")]
97    #[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
98    pub fn connect_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
99        Self::new_ascii(serial_config)
100    }
101
102    /// Deprecated constructor alias.
103    #[cfg(feature = "serial-ascii")]
104    #[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
105    pub fn connect_ascii_with_poll_interval(
106        serial_config: ModbusSerialConfig,
107        _poll_interval: Duration,
108    ) -> Result<Self, AsyncError> {
109        Self::new_ascii(serial_config)
110    }
111
112    /// Creates an async Modbus ASCII serial client without connecting.
113    ///
114    /// Validates that `serial_config.mode` is [`SerialMode::Ascii`].
115    /// Call [`AsyncClientCore::connect`] on the returned client before sending
116    /// requests.
117    #[cfg(feature = "serial-ascii")]
118    pub fn new_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
119        if serial_config.mode != SerialMode::Ascii {
120            return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
121        }
122        make_ascii_client(ModbusConfig::Serial(serial_config))
123    }
124
125    /// Creates an async Modbus ASCII serial client with a custom `poll_interval`.
126    ///
127    /// The poll interval is ignored in the async implementation.
128    #[cfg(feature = "serial-ascii")]
129    pub fn new_ascii_with_poll_interval(
130        serial_config: ModbusSerialConfig,
131        _poll_interval: Duration,
132    ) -> Result<Self, AsyncError> {
133        Self::new_ascii(serial_config)
134    }
135}
136
137impl<const ASCII: bool> AsyncSerialClient<ASCII> {
138    /// Deprecated constructor alias.
139    #[deprecated(
140        note = "use AsyncSerialClient::new_with_transport(...) and then client.connect().await"
141    )]
142    pub fn connect_with_transport<T>(
143        transport: T,
144        config: ModbusConfig,
145        poll_interval: Duration,
146    ) -> Result<Self, AsyncError>
147    where
148        T: AsyncTransport + Send + 'static,
149    {
150        Self::new_with_transport(transport, config, poll_interval)
151    }
152
153    /// Creates an async serial client from a caller-provided transport without
154    /// connecting.
155    ///
156    /// This is the escape hatch for custom serial drivers and integration tests
157    /// that inject a mock transport.  The `config` must be
158    /// `ModbusConfig::Serial(_)` or the call returns
159    /// `AsyncError::Mbus(MbusError::InvalidTransport)`.  Call
160    /// [`AsyncClientCore::connect`] on the returned client before sending
161    /// requests.
162    pub fn new_with_transport<T>(
163        transport: T,
164        config: ModbusConfig,
165        _poll_interval: Duration,
166    ) -> Result<Self, AsyncError>
167    where
168        T: AsyncTransport + Send + 'static,
169    {
170        if !matches!(config, ModbusConfig::Serial(_)) {
171            return Err(AsyncError::Mbus(MbusError::InvalidTransport));
172        }
173
174        // One-shot slot: the factory yields the transport on the first Connect,
175        // then signals ConnectionClosed if reconnection is attempted.
176        let slot = Arc::new(std::sync::Mutex::new(Some(transport)));
177        let connect_fn: ConnectFactory<T> = Box::new(move || {
178            let s = slot.clone();
179            Box::pin(async move { s.lock().unwrap().take().ok_or(MbusError::ConnectionClosed) })
180        });
181
182        spawn_serial_task(connect_fn)
183    }
184}
185
186// ── Internal helpers ──────────────────────────────────────────────────────────
187
188/// Spawns a [`ClientTask`] with the given factory and returns an
189/// [`AsyncSerialClient`] wired to it.
190fn spawn_serial_task<T: AsyncTransport + Send + 'static, const ASCII: bool>(
191    connect_fn: ConnectFactory<T>,
192) -> Result<AsyncSerialClient<ASCII>, AsyncError> {
193    let handle = tokio::runtime::Handle::try_current().map_err(|_| AsyncError::WorkerClosed)?;
194    let (cmd_tx, cmd_rx) = mpsc::channel(64);
195    let (pending_count_tx, pending_count_rx) = watch::channel(0usize);
196
197    #[cfg(feature = "traffic")]
198    let notifier = crate::client::notifier::new_notifier_store();
199
200    let task = ClientTask::<T, 1>::new(
201        connect_fn,
202        cmd_rx,
203        pending_count_tx,
204        #[cfg(feature = "traffic")]
205        notifier.clone(),
206    );
207    handle.spawn(task.run());
208
209    Ok(AsyncSerialClient {
210        core: AsyncClientCore::new(
211            cmd_tx,
212            pending_count_rx,
213            #[cfg(feature = "traffic")]
214            notifier,
215        ),
216    })
217}
218
219/// Builds an RTU [`ConnectFactory`] that opens a fresh serial connection each
220/// time it is called.
221#[cfg(feature = "serial-rtu")]
222fn make_rtu_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioRtuTransport> {
223    Box::new(move || {
224        let cfg = config.clone();
225        Box::pin(async move { TokioRtuTransport::new(&cfg) })
226    })
227}
228
229/// Builds an ASCII [`ConnectFactory`] that opens a fresh serial connection each
230/// time it is called.
231#[cfg(feature = "serial-ascii")]
232fn make_ascii_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioAsciiTransport> {
233    Box::new(move || {
234        let cfg = config.clone();
235        Box::pin(async move { TokioAsciiTransport::new(&cfg) })
236    })
237}
238
239/// Creates a full [`AsyncSerialClient`] for RTU mode.
240#[cfg(feature = "serial-rtu")]
241fn make_rtu_client(config: ModbusConfig) -> Result<AsyncSerialClient<false>, AsyncError> {
242    spawn_serial_task(make_rtu_factory(Arc::new(config)))
243}
244
245/// Creates a full [`AsyncSerialClient`] for ASCII mode.
246#[cfg(feature = "serial-ascii")]
247fn make_ascii_client(config: ModbusConfig) -> Result<AsyncSerialClient<true>, AsyncError> {
248    spawn_serial_task(make_ascii_factory(Arc::new(config)))
249}
250
251/// Modbus RTU async client.
252pub type AsyncRtuClient = AsyncSerialClient<false>;
253/// Modbus ASCII async client.
254pub type AsyncAsciiClient = AsyncSerialClient<true>;
255
256/// A runtime enum wrapping either an RTU or ASCII async serial client.
257///
258/// Implements [`Deref`] to [`AsyncClientCore`], allowing all Modbus operations
259/// to be called transparently regardless of the underlying framing mode.
260#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
261pub enum AsyncSerialClientKind {
262    /// RTU serial client.
263    #[cfg(feature = "serial-rtu")]
264    Rtu(AsyncRtuClient),
265    /// ASCII serial client.
266    #[cfg(feature = "serial-ascii")]
267    Ascii(AsyncAsciiClient),
268}
269
270#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
271impl Deref for AsyncSerialClientKind {
272    type Target = AsyncClientCore;
273
274    fn deref(&self) -> &Self::Target {
275        match self {
276            #[cfg(feature = "serial-rtu")]
277            Self::Rtu(client) => &client.core,
278            #[cfg(feature = "serial-ascii")]
279            Self::Ascii(client) => &client.core,
280        }
281    }
282}