Skip to main content

mbus_async/client/
serial_client.rs

1//! Async Modbus serial client.
2//!
3//! [`AsyncSerialClient`] is a thin wrapper around [`AsyncClientCore`] that adds
4//! serial-specific constructors (RTU, ASCII, and injection of a custom
5//! transport).  All Modbus request methods are inherited transparently through
6//! the [`std::ops::Deref`] implementation that resolves to `AsyncClientCore`.
7//!
8//! # Note on pipeline depth
9//!
10//! Serial Modbus is a strict request-reply protocol, so the background task is
11//! always built with a pipeline depth of 1 (`ClientTask::<_, 1>`).
12
13use std::ops::Deref;
14use std::sync::Arc;
15use std::time::Duration;
16
17use mbus_core::errors::MbusError;
18use mbus_core::transport::{AsyncTransport, ModbusConfig};
19#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
20use mbus_core::transport::{ModbusSerialConfig, SerialMode};
21#[cfg(feature = "serial-ascii")]
22use mbus_serial::TokioAsciiTransport;
23#[cfg(feature = "serial-rtu")]
24use mbus_serial::TokioRtuTransport;
25use tokio::sync::{mpsc, watch};
26
27use super::{AsyncClientCore, AsyncError};
28use crate::client::task::{ClientTask, ConnectFactory};
29
30/// Async Modbus serial client facade.
31///
32/// Supports both RTU and ASCII framing.  All Modbus request methods
33/// (`read_holding_registers`, `write_single_coil`, etc.) are available directly
34/// on this type via [`Deref`] to [`AsyncClientCore`].
35pub struct AsyncSerialClient {
36    core: AsyncClientCore,
37}
38
39impl Deref for AsyncSerialClient {
40    type Target = AsyncClientCore;
41
42    fn deref(&self) -> &Self::Target {
43        &self.core
44    }
45}
46
47// ── Constructors ──────────────────────────────────────────────────────────────
48
49impl AsyncSerialClient {
50    /// Deprecated constructor alias.
51    ///
52    /// Use [`AsyncSerialClient::new_rtu`] and then call `client.connect().await?`.
53    #[cfg(feature = "serial-rtu")]
54    #[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
55    pub fn connect_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
56        Self::new_rtu(serial_config)
57    }
58
59    /// Deprecated constructor alias.
60    #[cfg(feature = "serial-rtu")]
61    #[deprecated(note = "use AsyncSerialClient::new_rtu(...) and then client.connect().await")]
62    pub fn connect_rtu_with_poll_interval(
63        serial_config: ModbusSerialConfig,
64        _poll_interval: Duration,
65    ) -> Result<Self, AsyncError> {
66        Self::new_rtu(serial_config)
67    }
68
69    /// Deprecated constructor alias.
70    #[cfg(feature = "serial-ascii")]
71    #[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
72    pub fn connect_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
73        Self::new_ascii(serial_config)
74    }
75
76    /// Deprecated constructor alias.
77    #[cfg(feature = "serial-ascii")]
78    #[deprecated(note = "use AsyncSerialClient::new_ascii(...) and then client.connect().await")]
79    pub fn connect_ascii_with_poll_interval(
80        serial_config: ModbusSerialConfig,
81        _poll_interval: Duration,
82    ) -> Result<Self, AsyncError> {
83        Self::new_ascii(serial_config)
84    }
85
86    /// Deprecated constructor alias.
87    #[deprecated(
88        note = "use AsyncSerialClient::new_with_transport(...) and then client.connect().await"
89    )]
90    pub fn connect_with_transport<T>(
91        transport: T,
92        config: ModbusConfig,
93        poll_interval: Duration,
94    ) -> Result<Self, AsyncError>
95    where
96        T: AsyncTransport + Send + 'static,
97    {
98        Self::new_with_transport(transport, config, poll_interval)
99    }
100
101    /// Creates an async Modbus RTU serial client without connecting.
102    ///
103    /// Validates that `serial_config.mode` is [`SerialMode::Rtu`].
104    /// Call [`AsyncClientCore::connect`] on the returned client before sending
105    /// requests.
106    #[cfg(feature = "serial-rtu")]
107    pub fn new_rtu(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
108        if serial_config.mode != SerialMode::Rtu {
109            return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
110        }
111        make_rtu_client(ModbusConfig::Serial(serial_config))
112    }
113
114    /// Creates an async Modbus RTU serial client with a custom `poll_interval`.
115    ///
116    /// The poll interval is ignored in the async implementation.
117    #[cfg(feature = "serial-rtu")]
118    pub fn new_rtu_with_poll_interval(
119        serial_config: ModbusSerialConfig,
120        _poll_interval: Duration,
121    ) -> Result<Self, AsyncError> {
122        Self::new_rtu(serial_config)
123    }
124
125    /// Creates an async Modbus ASCII serial client without connecting.
126    ///
127    /// Validates that `serial_config.mode` is [`SerialMode::Ascii`].
128    /// Call [`AsyncClientCore::connect`] on the returned client before sending
129    /// requests.
130    #[cfg(feature = "serial-ascii")]
131    pub fn new_ascii(serial_config: ModbusSerialConfig) -> Result<Self, AsyncError> {
132        if serial_config.mode != SerialMode::Ascii {
133            return Err(AsyncError::Mbus(MbusError::InvalidConfiguration));
134        }
135        make_ascii_client(ModbusConfig::Serial(serial_config))
136    }
137
138    /// Creates an async Modbus ASCII serial client with a custom `poll_interval`.
139    ///
140    /// The poll interval is ignored in the async implementation.
141    #[cfg(feature = "serial-ascii")]
142    pub fn new_ascii_with_poll_interval(
143        serial_config: ModbusSerialConfig,
144        _poll_interval: Duration,
145    ) -> Result<Self, AsyncError> {
146        Self::new_ascii(serial_config)
147    }
148
149    /// Creates an async serial client from a caller-provided transport without
150    /// connecting.
151    ///
152    /// This is the escape hatch for custom serial drivers and integration tests
153    /// that inject a mock transport.  The `config` must be
154    /// `ModbusConfig::Serial(_)` or the call returns
155    /// `AsyncError::Mbus(MbusError::InvalidTransport)`.  Call
156    /// [`AsyncClientCore::connect`] on the returned client before sending
157    /// requests.
158    pub fn new_with_transport<T>(
159        transport: T,
160        config: ModbusConfig,
161        _poll_interval: Duration,
162    ) -> Result<Self, AsyncError>
163    where
164        T: AsyncTransport + Send + 'static,
165    {
166        if !matches!(config, ModbusConfig::Serial(_)) {
167            return Err(AsyncError::Mbus(MbusError::InvalidTransport));
168        }
169
170        // One-shot slot: the factory yields the transport on the first Connect,
171        // then signals ConnectionClosed if reconnection is attempted.
172        let slot = Arc::new(std::sync::Mutex::new(Some(transport)));
173        let connect_fn: ConnectFactory<T> = Box::new(move || {
174            let s = slot.clone();
175            Box::pin(async move { s.lock().unwrap().take().ok_or(MbusError::ConnectionClosed) })
176        });
177
178        spawn_serial_task(connect_fn)
179    }
180}
181
182// ── Internal helpers ──────────────────────────────────────────────────────────
183
184/// Spawns a [`ClientTask`] with the given factory and returns an
185/// [`AsyncSerialClient`] wired to it.
186fn spawn_serial_task<T: AsyncTransport + Send + 'static>(
187    connect_fn: ConnectFactory<T>,
188) -> Result<AsyncSerialClient, AsyncError> {
189    let handle = tokio::runtime::Handle::try_current().map_err(|_| AsyncError::WorkerClosed)?;
190    let (cmd_tx, cmd_rx) = mpsc::channel(64);
191    let (pending_count_tx, pending_count_rx) = watch::channel(0usize);
192
193    #[cfg(feature = "traffic")]
194    let notifier = crate::client::notifier::new_notifier_store();
195
196    let task = ClientTask::<T, 1>::new(
197        connect_fn,
198        cmd_rx,
199        pending_count_tx,
200        #[cfg(feature = "traffic")]
201        notifier.clone(),
202    );
203    handle.spawn(task.run());
204
205    Ok(AsyncSerialClient {
206        core: AsyncClientCore::new(
207            cmd_tx,
208            pending_count_rx,
209            #[cfg(feature = "traffic")]
210            notifier,
211        ),
212    })
213}
214
215/// Builds an RTU [`ConnectFactory`] that opens a fresh serial connection each
216/// time it is called.
217#[cfg(feature = "serial-rtu")]
218fn make_rtu_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioRtuTransport> {
219    Box::new(move || {
220        let cfg = config.clone();
221        Box::pin(async move { TokioRtuTransport::new(&cfg) })
222    })
223}
224
225/// Builds an ASCII [`ConnectFactory`] that opens a fresh serial connection each
226/// time it is called.
227#[cfg(feature = "serial-ascii")]
228fn make_ascii_factory(config: Arc<ModbusConfig>) -> ConnectFactory<TokioAsciiTransport> {
229    Box::new(move || {
230        let cfg = config.clone();
231        Box::pin(async move { TokioAsciiTransport::new(&cfg) })
232    })
233}
234
235/// Creates a full [`AsyncSerialClient`] for RTU mode.
236#[cfg(feature = "serial-rtu")]
237fn make_rtu_client(config: ModbusConfig) -> Result<AsyncSerialClient, AsyncError> {
238    spawn_serial_task(make_rtu_factory(Arc::new(config)))
239}
240
241/// Creates a full [`AsyncSerialClient`] for ASCII mode.
242#[cfg(feature = "serial-ascii")]
243fn make_ascii_client(config: ModbusConfig) -> Result<AsyncSerialClient, AsyncError> {
244    spawn_serial_task(make_ascii_factory(Arc::new(config)))
245}