mbus-async
A pure-async Tokio-based Modbus client stack, with optional async server adapters.
It is recommended to use:
- modbus-rs for the complete package.
If you prefer to use individual crates, please note that I am planning to remove this crate in the future. The following crates will consolidate its functionality:
mbus-async drives Modbus communication natively in Tokio tasks. Each request is a Future
that resolves when the server responds. The transport layer is owned by a background Tokio task
and communicates with the public API through Tokio channels (mpsc, oneshot, and watch).
TCP Quick Start
Add to Cargo.toml:
[]
= { = "0.11.0", = ["network-tcp", "coils", "registers"] }
= { = "1", = ["full"] }
use AsyncTcpClient;
async
Serial Quick Start
[]
= { = "0.11.0", = false, = [
"async", "serial-rtu", "coils", "registers"
] }
= { = "1", = ["full"] }
use AsyncSerialClient;
use ;
use FromStr;
async
Design
┌─────────────────────────────────────────────────────────────────┐
│ Your async code │
│ client.read_holding_registers(1, 0, 10).await? │
└─────────────────────────┬───────────────────────────────────────┘
│ TaskCommand::Request (mpsc)
▼
┌─────────────────────────────────────────────────────────────────┐
│ ClientTask (Tokio task — tokio::spawn) │
│ │
│ tokio::select! { │
│ frame ← transport.recv_frame() (TCP / serial) │
│ cmd ← mpsc::Receiver<TaskCommand> │
│ } │
│ │
│ pending: HashMap<txn_id, (request, oneshot::Sender)> │
│ queued: VecDeque<TaskCommand> (if in_flight == N) │
└─────────────────────────┬───────────────────────────────────────┘
│ oneshot resolved
▼
┌─────────────────────────────────────────────────────────────────┐
│ Your async code resumes with the typed result │
└─────────────────────────────────────────────────────────────────┘
new()/new_rtu()creates a client and spawns a Tokio task (tokio::spawn(task.run())).- Each request method creates a
oneshotchannel, enqueues aTaskCommand::Request, andawaits the oneshot receiver. - The task drives
tokio::select!between receiving frames from the transport and receiving new commands from the public API. - On a complete response frame the task matches it to the pending entry by transaction id and resolves the caller's oneshot.
has_pending_requests()is a synchronous check (reads awatchchannel — no.await).- Dropping all handles closes the mpsc channel; the task exits cleanly.
TCP uses a compile-time pipeline depth N (AsyncTcpClient<const N: usize = 9>).
Serial is always depth 1 (request/reply protocol).
Features
| Feature | Default | Enables |
|---|---|---|
network-tcp |
✓ | AsyncTcpClient via mbus-network |
serial-rtu |
AsyncSerialClient with RTU framing |
|
serial-ascii |
AsyncSerialClient with ASCII framing |
|
coils |
✓ | Coil read/write methods |
registers |
✓ | Register read/write/mask methods |
discrete-inputs |
✓ | Discrete input read methods |
fifo |
✓ | FIFO queue read methods |
file-record |
✓ | File record read/write methods |
diagnostics |
✓ | Device identification, diagnostics, event log, etc. |
traffic |
Raw TX/RX frame callback API from the background async task | |
diagnostics-stats |
Async server diagnostics counters (depends on diagnostics) |
|
logging |
Enables log integration in this crate |
|
server-tcp |
server::AsyncTcpServer via mbus-network async transport |
|
server-serial |
server::AsyncRtuServer and server::AsyncAsciiServer |
|
full |
default + traffic |
Available Methods
AsyncTcpClient and AsyncSerialClient
Constructors are side-effect free. Build the client first, then call
client.connect().await? before issuing Modbus requests.
Both clients expose an identical async API:
| Method | FC | Feature |
|---|---|---|
read_multiple_coils(unit, address, quantity) |
01 | coils |
write_single_coil(unit, address, value) |
05 | coils |
write_multiple_coils(unit, address, &coils) |
15 | coils |
read_discrete_inputs(unit, address, quantity) |
02 | discrete-inputs |
read_holding_registers(unit, address, quantity) |
03 | registers |
read_input_registers(unit, address, quantity) |
04 | registers |
write_single_register(unit, address, value) |
06 | registers |
write_multiple_registers(unit, address, &values) |
16 | registers |
read_write_multiple_registers(unit, ra, rq, wa, &wv) |
23 | registers |
mask_write_register(unit, address, and_mask, or_mask) |
22 | registers |
read_fifo_queue(unit, address) |
24 | fifo |
read_file_record(unit, &sub_request) |
20 | file-record |
write_file_record(unit, &sub_request) |
21 | file-record |
read_device_identification(unit, code, object_id) |
43/14 | diagnostics |
encapsulated_interface_transport(unit, mei, &data) |
43 | diagnostics |
read_exception_status(unit) |
07 | diagnostics |
diagnostics(unit, sub_fn, &data) |
08 | diagnostics |
get_comm_event_counter(unit) |
11 | diagnostics |
get_comm_event_log(unit) |
12 | diagnostics |
report_server_id(unit) |
17 | diagnostics |
Serial-specific constructors
| Constructor | Mode |
|---|---|
AsyncSerialClient::new_rtu(config) |
RTU |
AsyncSerialClient::new_rtu_with_poll_interval(config, interval) |
RTU |
AsyncSerialClient::new_ascii(config) |
ASCII |
AsyncSerialClient::new_ascii_with_poll_interval(config, interval) |
ASCII |
Each constructor validates that ModbusSerialConfig::mode matches the constructor's expected mode, returning AsyncError::Mbus(MbusError::InvalidConfiguration) on mismatch.
TCP-specific constructors
Default pipeline (AsyncTcpClient<9>) constructors:
| Constructor | Notes |
|---|---|
AsyncTcpClient::new(host, port) |
Pipeline depth 9 |
AsyncTcpClient::new_with_poll_interval(host, port, interval) |
poll_interval ignored (pure-async) |
AsyncTcpClient::new_with_config(tcp_config, interval) |
Full ModbusTcpConfig |
Custom pipeline (AsyncTcpClient<N>) constructors:
| Constructor | Notes |
|---|---|
AsyncTcpClient::<N>::new_with_pipeline(host, port) |
Compile-time depth N |
AsyncTcpClient::<N>::new_with_pipeline_and_poll_interval(host, port, interval) |
poll_interval ignored |
AsyncTcpClient::<N>::new_with_config_and_pipeline(tcp_config, interval) |
Full config + depth N |
Error Handling
use AsyncError;
match client.read_holding_registers.await
Concurrency
Multiple concurrent .await calls are supported. Each gets an independent transaction id
and oneshot channel. The background Tokio task routes responses back by transaction id.
- TCP: up to
Nrequests in-flight simultaneously (defaultN = 9). Excess requests are queued internally and dispatched as pipeline slots free up. - Serial: exactly 1 in-flight request (Modbus serial is request/reply).
Example with custom TCP pipeline depth:
use AsyncTcpClient;
let client = new_with_pipeline?;
client.connect.await?;
Per-Request Timeout
Set a deadline applied to all subsequent requests:
use Duration;
client.set_request_timeout;
// Returns AsyncError::Timeout after 500ms with no response.
// The background task automatically drains the pipeline and closes the
// transport — call client.connect().await? to recover.
let result = client.read_multiple_coils.await;
client.clear_request_timeout; // back to "wait forever"
Checking Pending Requests
has_pending_requests() is synchronous — no .await required:
if client.has_pending_requests
Reconnect
After a transport error or timeout, call connect() again to restore the connection:
client.connect.await?; // safe to call repeatedly
Traffic Callback (optional traffic feature)
Enable traffic when you need raw frame observability in async apps:
[]
= { = "0.11.0", = false, = [
"async", "traffic", "network-tcp", "coils"
] }
= { = "1", = ["full"] }
use AsyncTcpClient;
use AsyncClientNotifier;
use ;
;
async
License
This crate is licensed under GPL-3.0 — see the repository root LICENSE.
Commercial licenses are also available for proprietary use; contact ch.raghava44@gmail.com.