mqtt-rs
MQTT driver for epics-rs — publish/subscribe MQTT topics as EPICS records.
An asyn PortDriver that bridges MQTT brokers to the EPICS record layer. Incoming MQTT messages update PVs via I/O Intr scanning; EPICS output record writes are published to the broker.
Inspired by epicsMQTT (C++ MQTT support for EPICS using autoparamDriver + Paho). This is an independent Rust implementation, not a port.
Features
- Generic MQTT — works with any MQTT broker and topic structure
- FLAT payloads — single values:
INT,FLOAT,DIGITAL,STRING,INTARRAY,FLOATARRAY - JSON payloads — extract nested fields via dot-path (e.g.
sensor.temperature) - Bidirectional — input records subscribe, output records publish
- Auto-reconnect — rumqttc handles broker reconnection transparently
- Connection status PV — bi record with alarm on disconnect; tracked via ConnAck and inbound activity (Publish / PingResp) so the PV cannot get stuck at 0 after a recoverable rumqttc error
- Zigbee2MQTT builders — optional device type builders for Z2M (Plug, Light, Switch, TempSensor, Motion, Remote)
Topic Address Format
Records reference MQTT topics through the asyn drvInfo string:
FORMAT:TYPE topic/name [json.field.path]
| Field | Values |
|---|---|
| FORMAT | FLAT, JSON |
| TYPE | INT, FLOAT, DIGITAL, STRING, INTARRAY, FLOATARRAY |
| topic | MQTT topic (no wildcards, spaces allowed) |
| json.field.path | Dot-separated path for JSON payloads (required for JSON, forbidden for FLAT) |
Examples:
FLAT:FLOAT sensors/temperature
FLAT:INT sensors/counter
FLAT:STRING device/status
JSON:FLOAT sensors/environment humidity
JSON:INT sensors/data reading.value
Generic MQTT Usage
For any MQTT broker and topic structure — no Z2M dependency.
Database (.db file)
record(ai, "$(P)Temperature") {
field(DTYP, "asynFloat64")
field(INP, "@asyn($(PORT)) FLAT:FLOAT sensors/temperature")
field(SCAN, "I/O Intr")
field(EGU, "degC")
field(PREC, "2")
}
record(ao, "$(P)Setpoint") {
field(DTYP, "asynFloat64")
field(OUT, "@asyn($(PORT)) FLAT:FLOAT actuators/setpoint")
}
record(ai, "$(P)Humidity") {
field(DTYP, "asynFloat64")
field(INP, "@asyn($(PORT)) JSON:FLOAT sensors/environment humidity")
field(SCAN, "I/O Intr")
}
Startup Script
# Register topics
)
)
)
# Create driver with optional connection status PV
)
# Load records
)
IOC Binary
use register_mqtt_commands;
let trace = new;
let handle = runtime_handle;
let mut app = new;
app = register_asyn_device_support;
app = register_mqtt_commands;
app.startup_script
.run
.await
Zigbee2MQTT Builders
Optional device type builders that auto-register topics AND create EPICS records — no .db file needed. Each builder knows the Z2M JSON payload structure for its device type.
Z2M-specific behavior:
- ON/OFF normalization on
/set statetopics:"1"/"on"/"true"→"ON","0"/"off"/"false"→"OFF" - This normalization only applies to Z2M builder topics, not generic MQTT topics
Startup Script (Z2M)
)
)
)
)
)
)
)
Z2M Device Types
| Command | Records Created | Fields |
|---|---|---|
mqttZ2mPlug |
ai, ai, longin, stringin, stringout | power, energy, device_temp, state, set_state |
mqttZ2mTempSensor |
ai, ai, longin | temperature, humidity, battery |
mqttZ2mLight |
longin, longin, stringin, stringout, longout | brightness, color_temp, state, set_state, set_brightness |
mqttZ2mSwitch |
stringin, stringout | state, set_state |
mqttZ2mMotion |
stringin, longin | occupancy, battery |
mqttZ2mRemote2 |
stringin, longin | action, battery |
IOC Binary (with Z2M)
use register_mqtt_commands;
use register_z2m_commands;
let mut app = new;
app = register_asyn_device_support;
app = register_mqtt_commands;
app = register_z2m_commands; // adds mqttZ2m* commands
iocsh Commands
Core
| Command | Arguments | Description |
|---|---|---|
mqttAddTopic |
portName drvInfo |
Register a topic before driver creation |
mqttDriverConfigure |
portName brokerUrl clientId [qos] [connPvName] |
Create driver, connect to broker |
Z2M Builders
| Command | Arguments | Description |
|---|---|---|
mqttZ2mPlug |
port prefix dev topic |
Smart plug (power/energy/temp/state) |
mqttZ2mTempSensor |
port prefix dev topic |
Temp/humidity sensor |
mqttZ2mLight |
port prefix dev topic |
Dimmable light |
mqttZ2mSwitch |
port prefix dev topic |
On/off switch |
mqttZ2mMotion |
port prefix dev topic |
Motion sensor |
mqttZ2mRemote2 |
port prefix dev topic |
2-button remote |
QoS values: 0 = at most once, 1 = at least once (default), 2 = exactly once
Supported Record Types
| asyn Interface | FLAT Types | JSON Types | Direction |
|---|---|---|---|
| asynInt32 | FLAT:INT |
JSON:INT |
Read / Write |
| asynFloat64 | FLAT:FLOAT |
JSON:FLOAT |
Read / Write |
| asynUInt32Digital | FLAT:DIGITAL |
JSON:DIGITAL |
Read / Write |
| asynOctet | FLAT:STRING |
JSON:STRING |
Read / Write |
| asynInt32Array | FLAT:INTARRAY |
— | Read / Write |
| asynFloat64Array | FLAT:FLOATARRAY |
— | Read / Write |
Connection Status PV
mqttDriverConfigure's optional 5th argument creates a bi record wired to an
internal _MQTT_CONNECTED asyn parameter:
mqttDriverConfigure("MQTT1", "mqtt://broker:1883", "clientId", 1, "TEST:MQTT:Connected")
The record is defined with ZNAM="Disconnected", ONAM="Connected", and
ZSV="MAJOR" (disconnect raises a MAJOR alarm). The driver flips the
underlying value based on the MQTT event stream:
| Event | Connected PV |
|---|---|
Incoming::ConnAck (fresh handshake / every reconnect) |
1 |
Incoming::Publish (subscribed topic message) — when flag is 0 |
1 |
Incoming::PingResp (keep-alive response) — when flag is 0 |
1 |
eventloop.poll() returns Err(_) (any ConnectionError) |
0 |
Publish/PingResp fallbacks exist because some rumqttc errors are recoverable
state errors (e.g. packet-id collision) that return Err without tearing
down the TCP/MQTT session. Without these fallbacks the PV would latch at 0
even while data keeps flowing. Any subsequent proof-of-life packet (a
subscribed Publish, a PingResp to the keep-alive heartbeat) will automatically
restore the PV to 1.
Logging
mqtt-rs emits tracing events; they are silent unless your binary installs a
subscriber. The mqtt-ioc example does this via tracing_subscriber::fmt;
for your own IOC add the same (or any subscriber) to see mqtt-rs output:
| Event | Level | Message |
|---|---|---|
| Reconnect handshake | info |
MQTT connected, subscribing to N topics |
poll() error |
error |
MQTT connection error: <ConnectionError variant> |
| Connected=1 restored from Publish/PingResp | debug |
MQTT {Publish,PingResp} received while Connected=0 — restoring Connected=1 |
With the default filter info you see reconnects and errors. Use
RUST_LOG=info,mqtt_rs=debug to additionally see the recovery path (useful
when diagnosing why the Connected PV moved).
Dependencies
- rumqttc — async MQTT client
- serde_json — JSON parsing
- asyn-rs — PortDriver framework
Examples
examples/mqtt-ioc/— Z2M device builders demoexamples/mqtt-ioc/db/mqtt.db— generic MQTT records (no Z2M)