Convoy
A reliable MQTT bridge with SQLite message caching - for edge devices with patchy connectivity.
This is an opinionated implementation. Currently it has the following limitations:
- TLS support via
native-tlsand SQLite support via system library - to reduce binary size for edge environments - Topic mapping is not fully dynamic; currently always based on prefix on remote broker
- No MQTT v5 support (only v3.1.1)
Features
- Bidirectional MQTT bridging between local and remote brokers
- SQLite-backed message cache for local→remote messages when remote is unavailable
- Automatic cache replay when connection is restored (FIFO order)
- Topic mapping with wildcard support (
+,#) - TLS support for secure remote connections
- MQTT v3.1.1 support (default)
- Bridge state publishing with Last Will and Testament (LWT)
- Configurable cache policies (size limits, eviction strategies)
Use Cases
- Edge-to-Cloud: Bridge edge devices to cloud MQTT brokers with resilience to network outages
- IoT Gateways: Forward sensor data from local networks to remote servers
- Data Aggregation: Collect telemetry from local devices and batch forward to cloud
Quick Start
As a Standalone Application
1. Build
2. Configure
Copy the example configuration and edit it:
# Edit config.toml with your broker settings
3. Run
Or with debug logging:
As a Library
Add to your Cargo.toml (no default features suppresses default CLI dependencies):
[]
= { = "0.1", = false }
Example usage (see examples/programmatic.rs for a complete example):
use ;
use Arc;
async
Run the example:
Configuration
See config.example.toml for a fully documented configuration template. Key sections:
Bridge Settings
[]
# Local broker (no auth/TLS)
= "127.0.0.1:1883"
= "convoy-local"
# Remote broker (TLS + auth)
= "mqtt.example.com:8883"
= "convoy-remote"
= "device01"
= "secret"
# Bridge state topic (published to remote)
= "bridge/convoy/state"
= "1"
= "0"
Topic Forwarding (Local → Remote)
Messages are cached if remote is unavailable:
[[]]
= "sensors/#"
= "devices/edge1/"
= 1
Example: Local sensors/temp → Remote devices/edge1/sensors/temp
Topic Subscription (Remote → Local)
Messages are NOT cached (real-time only):
[[]]
= "commands/edge1/#"
= "commands/edge1/"
= 1
Example: Remote commands/edge1/restart → Local restart
Cache Settings
[]
= "/var/lib/convoy/cache.sqlite"
= 500000 # Maximum cached messages
= "drop_oldest" # or "reject_new"
= 1000 # Messages per replay batch
= 100 # Replay interval
How It Works
Message Flow
-
Local → Remote (with caching):
- Bridge subscribes to configured topics on local broker
- When message arrives, applies topic mapping
- If remote connected: publishes immediately
- If remote down or publish fails: caches to SQLite
- On reconnect: replays cache in FIFO order
-
Remote → Local (no caching):
- Bridge subscribes to configured topics on remote broker
- When message arrives, applies topic mapping and publishes to local
- If local is down, message is lost (no caching)
Topic Mapping
-
Wildcards:
+matches single level:sensors/+/tempmatchessensors/room1/temp#matches multiple levels:data/#matchesdata/sensor/temp/value
-
Prefix mapping:
- Forward: prepends
remote_prefixto local topic - Subscribe: strips
remote_prefixfrom remote topic
- Forward: prepends
Bridge State
- On connect: publishes online state to
state_topic(retained) - On disconnect: LWT publishes offline state to
state_topic - Allows remote systems to monitor bridge health
Architecture
┌──────────────────┐
│ Local Broker │ (localhost:1883, no auth)
│ (Mosquitto) │
└────────┬─────────┘
│
┌────▼────┐
│ Bridge │ (rumqttc clients)
│ Client │
└────┬────┘
│
┌────▼────┐
│ SQLite │ (cache A→B only)
│ Cache │
└────┬────┘
│
┌────────▼─────────┐
│ Remote Broker │ (TLS, port 8883)
│ (e.g. AWS IoT) │
└──────────────────┘
Cargo Features
cli(default): Enables the command-line interface with TOML config file support- Required dependencies:
clap,toml,tracing-subscriber - Use this feature when building the standalone binary
- Library users don't need this feature (set
default-features = false)
- Required dependencies:
Implementation Notes
- MQTT Protocol: v3.1.1 (via rumqttc)
- Cache: SQLite with WAL mode for concurrency
- TLS: native-tls with support for:
- Custom CA certificates (PEM format)
- Client certificates for mTLS (PKCS12 format)
- System certificate store as fallback
- Async Runtime: Tokio
Testing
Run unit tests:
For integration testing with actual MQTT brokers, see SPECS.md section 6.
CLI Options
convoy [OPTIONS]
Options:
-c, --config <CONFIG> Path to config file [default: config.toml]
-l, --log-level <LOG_LEVEL> Log level (trace|debug|info|warn|error) [default: info]
-h, --help Print help
License
See SPECS.md for design documentation and acceptance criteria.