MQ Multi Bridge
A message queue bridge application written in Rust, designed to connect different messaging systems like RabbitMQ, Kafka, and NATS.
Status
This project is under active development. While many features are functional, APIs may change. Use with caution in production environments. It is used as example and intended to test the bridge library https://github.com/marcomq/mq-bridge
Features
- Multiple Broker Support: Connect Kafka, NATS, AMQP (e.g., RabbitMQ), MQTT, and HTTP in any direction.
- HTTP Integration: Expose HTTP endpoints as message sources (e.g., for webhooks) or sinks (to call external APIs), with support for request-response patterns.
- File I/O: Use local files as a source (reading line-by-line) or a sink (appending messages).
- Performant: Built with Tokio for asynchronous, non-blocking I/O.
- Deduplication: Optional message deduplication to prevent processing duplicates (requires a persistent on-disk database).
- Observable: Structured (JSON) logging and Prometheus metrics for observability.
- Configurable: Easily configured via a file or environment variables.
Getting Started
Prerequisites
- Rust toolchain (latest stable version recommended)
- Access to the message brokers you want to connect (e.g., Kafka, NATS, RabbitMQ)
Installation
Docker (Recommended)
The easiest way to run the application is using the pre-built Docker image, which includes all necessary dependencies (like the IBM MQ client).
Cargo
If you have Rust installed, you can install the application directly from source. Note that you may need to install development libraries for the brokers you intend to use (e.g., librdkafka-dev for Kafka, or the IBM MQ client).
Build from Source
-
Clone the repository:
-
Build and run:
-
Configure the application: Create a
config.yamlfile in the project root or set environment variables. See the Configuration section for details.
Build Docker Image (doesn't require local Rust)
-
Prerequisites: Docker and Docker Compose must be installed.
-
Start Services:
This will start Kafka, NATS, and the bridge application.
Configuration
The application can be configured in three ways, with the following order of precedence (lower numbers are overridden by higher numbers):
- Default Values: The application has built-in default values for most settings.
- Configuration File: A file named
config.[yaml|json|toml]can be placed in the application's working directory. - Environment Variables: Any setting can be overridden using environment variables.
Configuration File
You can create a configuration file (e.g., config.yaml) to specify your settings. This is the recommended approach for managing complex route configurations.
Example config.yaml:
# General settings
log_level: "info"
# Define bridge routes from a source to a sink
routes:
my_kafka_to_nats:
input:
kafka:
brokers: "kafka-us.example.com:9092"
group_id: "bridge-group-us" # topic is optional, defaults to route name
output:
nats:
url: "nats://nats.example.com:4222"
stream: "events" # subject is optional, defaults to route name
amqp_to_kafka_orders:
input:
amqp:
url: "amqp://user:pass@rabbitmq.example.com:5672"
# queue is optional, defaults to route name
output:
kafka:
brokers: "kafka-eu.example.com:9092"
group_id: "bridge-group-eu"
# topic is optional, defaults to route name
webhook_to_kafka:
input:
http:
url: "0.0.0.0:9090"
output:
kafka:
brokers: "kafka-eu.example.com:9092"
group_id: "bridge-group-eu"
# topic defaults to "webhook_to_kafka"
kafka_to_url:
input:
kafka:
brokers: "kafka-eu.example.com:9092"
group_id: "bridge-group-eu"
topic: "outgoing.events"
output:
http:
url: "https://api.example.com/ingest" # Override default URL
file_to_kafka:
input:
file:
path: "/var/data/input.log"
output:
kafka:
brokers: "kafka-eu.example.com:9092"
group_id: "bridge-group-eu"
topic: "from_file"
Environment Variables
All configuration parameters can be set via environment variables. This is particularly useful for containerized deployments (e.g., Docker, Kubernetes). The variables must be prefixed with MQB_, and nested keys are separated by a double underscore __. For map-like structures such as routes, the key becomes part of the variable name.
Example using environment variables:
# General settings
# Metrics
# Route 'kafka_us_to_nats_events': kafka -> nats
# topic is optional
# DLQ for Route 'kafka_us_to_nats_events'
Example Configurations
This repository includes a set of example configurations in the /examples directory to help you get started quickly. These examples are also included in the Docker image under /app/examples.
You can use them with Docker by mounting them from your host or by referencing them from within the image using the --config flag:
# Using an example from the host
# Using an example from within the image
Available Examples:
- http-to-kafka.yml: Exposes an HTTP endpoint and forwards incoming requests to a Kafka topic.
- kafka-to-nats.yml: A simple route from a Kafka topic to a NATS subject.
- rabbitmq-to-file.yml: Reads messages from a RabbitMQ queue and appends them to a log file (requires mounting a volume for /data).
Using a .env file
For local development, you can place a .env file in the root of the project. The application will automatically load the variables from this file.
Architecture & Web UI
This application demonstrates a unique usage of the mq-bridge library itself to serve its own management UI.
Backend: mq-bridge as a Web Server
Instead of using a traditional web framework like Actix or Axum directly for the management API, the application uses mq-bridge's internal routing mechanism:
- HTTP Input: An
httpinput endpoint listens on the configured UI port. It converts incoming HTTP requests intoCanonicalMessages. - WebUiHandler: A custom
Handlerprocesses these messages. It acts as a router, serving static files (HTML, JS) or handling API requests (e.g.,/config,/schema.json). - Response Output: The handler returns a response message, which is sent to a
responseoutput endpoint, completing the HTTP request-response cycle.
This approach showcases the library's ability to handle request-reply patterns and serve as a lightweight web server.
Frontend: vanilla-schema-forms
The Web UI is dynamically generated from the Rust configuration structures:
- Schema Generation: The backend uses
schemarsto generate a JSON Schema for theAppConfigstruct at runtime. This is exposed via/schema.json. - Dynamic Form: The frontend uses vanilla-schema-forms to render a complete configuration form based solely on this schema.
- No UI Code Changes: When new features or configuration options are added to the Rust code (e.g., a new middleware), the schema updates automatically, and the UI reflects these changes without requiring any frontend code modifications.
Using as a Library
Beyond running as a standalone application, the core logic is available as a library crate mq-bridge to interact with various message brokers using a unified API. This is useful for building custom applications that need to produce or consume messages without being tied to a specific broker's SDK.
The core of the library are the MessageConsumer and MessagePublisher traits, found in mq_bridge::traits.
License
This project is licensed under the MIT License - see the LICENSE file for details.