# mq-bridge Project Context
## Project Overview
`mq-bridge` is an asynchronous message bridging library for Rust that connects different messaging systems, data stores, and protocols. It acts as a **programmable integration layer**, allowing for transformation, filtering, handling, events, and complex routing.
## Core Architecture
### Key Concepts
1. **Route**: A named data pipeline that defines a flow from one `input` endpoint to one `output` endpoint
2. **Endpoint**: A source (consumer) or sink (publisher) for messages - supports Kafka, NATS, AMQP, MQTT, MongoDB, ZeroMQ, HTTP, Files, and in-memory channels
3. **Top-Level API**: A set of functions in the `mq_bridge` root for managing routes and publishers (e.g., `get_route`, `stop_route`, `get_publisher`).
4. **Middleware**: Components that intercept and process messages (retries, DLQ, deduplication, metrics)
5. **Handler**: Programmatic components for business logic (CommandHandler for 1-to-1 transformations, EventHandler for terminal consumption)
6. **CanonicalMessage**: The unified message format used throughout the system
7. **Request-Reply**: Native support for synchronous request-reply pattern on NATS, MongoDB, and Memory endpoints
### Design Principles
- **Protocol Abstraction**: Write business logic against `CanonicalMessage`, swap transports via configuration
- **Unopinionated**: Doesn't enforce specific architectural patterns (CQRS/ES), focuses on reliable data movement
- **Async-First**: Built on Tokio for efficient async I/O
- **Feature Flags**: Optional dependencies via Cargo features (kafka, amqp, nats, mqtt, mongodb, zeromq, http, metrics, dedup)
### File Structure
```
src/
├── lib.rs # Main library entry point
├── canonical_message.rs # Unified message format
├── traits.rs # Core traits (MessageConsumer, MessagePublisher, Handler)
├── models.rs # Configuration models (Route, Endpoint, Middleware)
├── route.rs # Route execution logic (sequential/concurrent)
├── endpoints/ # Endpoint implementations
│ ├── mod.rs # Factory functions for creating consumers/publishers
│ ├── kafka.rs # Kafka consumer/publisher
│ ├── nats.rs # NATS consumer/publisher
│ ├── amqp.rs # AMQP (RabbitMQ) consumer/publisher
│ ├── mqtt.rs # MQTT consumer/publisher
│ ├── mongodb.rs # MongoDB consumer/publisher
│ ├── http.rs # HTTP consumer/publisher
│ ├── memory.rs # In-memory channels (for testing)
│ ├── file.rs # File-based endpoints
│ ├── fanout.rs # Fanout publisher (broadcast)
│ ├── switch.rs # Content-based routing
│ └── null.rs # Null endpoint (sink)
├── middleware/ # Middleware implementations
│ ├── retry.rs # Exponential backoff retry
│ ├── dlq.rs # Dead-letter queue
│ ├── deduplication.rs # Message deduplication (sled)
│ ├── metrics.rs # Metrics collection
│ └── random_panic.rs # Testing middleware
├── command_handler.rs # Command handler wrapper
├── event_handler.rs # Event handler wrapper
├── event_store.rs # In-memory event store
├── type_handler.rs # Typed message handlers
├── errors.rs # Error types
└── outcomes.rs # Result types (Handled, Sent, Received)
```
### Consumer vs Subscriber
- **Consumer**: Persistent mode - uses consumer groups/durable queues, resumes from last committed position
- **Subscriber**: Ephemeral mode - unique IDs per instance, receives only new messages
Both implement `MessageConsumer` trait. Subscribers often wrap consumers (e.g., `MemorySubscriber` wraps `MemoryConsumer`).
### Route Execution
Routes can run in two modes:
- **Sequential** (`concurrency: 1`): Single-threaded processing
- **Concurrent** (`concurrency > 1`): Worker pool with configurable concurrency
Routes handle graceful shutdown via `shutdown_rx` channel. The `select!` macro cancels `receive_batch()` futures when shutdown is received.
### Testing
- **Unit tests**: In each module
- **Integration tests**: `tests/integration/` - require Docker services
- **Performance tests**: `tests/performance_pipeline.rs` and `benches/performance_bench.rs`
- **Memory tests**: `tests/memory_test.rs` - no external dependencies
Integration tests use Docker Compose files in `tests/integration/docker-compose/`.
### Configuration
Routes are defined via:
- YAML files
- JSON files
- Environment variables (prefix: `MQB__`, separator: `__`)
Example:
```yaml
kafka_to_nats:
concurrency: 4
batch_size: 128
input:
kafka:
topic: "orders"
url: "localhost:9092"
output:
nats:
subject: "orders.processed"
url: "nats://localhost:4222"
```
### Common Patterns
1. **Creating endpoints**: Use factory functions in `endpoints/mod.rs` (`create_consumer_from_route`, `create_publisher_from_route`)
2. **Adding middleware**: Middlewares are applied in `middleware/mod.rs` via `apply_middlewares_to_consumer` and `apply_middlewares_to_publisher`
3. **Custom endpoints**: Implement `CustomEndpointFactory` trait
4. **Custom middleware**: Implement `CustomMiddlewareFactory` trait
5. **Typed handlers**: Use `TypeHandler` to deserialize messages into Rust types based on `kind` metadata field
6. **Request-Reply**: Enable via `request_reply: true` in publisher config (NATS, MongoDB, Memory). Uses UUID v7 for correlation.
### Error Handling
- `ConsumerError`: Errors during message consumption (Connection, EndOfStream)
- `ProcessingError` (aliased as `HandlerError`/`PublisherError`): Errors during processing (Retryable, NonRetryable)
### Performance Considerations
- Batch processing is preferred (`receive_batch`, `send_batch`)
- Concurrency is configurable per route
- Middleware adds overhead - use only what's needed
- Memory endpoints are fastest (no I/O), useful for testing
### When Making Changes
1. **New endpoint**: Add to `endpoints/mod.rs` factory functions, implement `MessageConsumer`/`MessagePublisher` traits
2. **New middleware**: Add to `middleware/mod.rs`, implement middleware wrapper types
3. **Configuration changes**: Update `models.rs` with new config structs, add to `EndpointType`/`Middleware` enums
4. **Tests**: Add integration tests in `tests/integration/`, update Docker Compose if needed
5. **Documentation**: Update README.md and add doc comments
### Dependencies
- **tokio**: Async runtime
- **serde**: Serialization/deserialization
- **anyhow**: Error handling
- **tracing**: Structured logging
- **config**: Configuration management
- **async-trait**: Async trait support
- **uuid**: Unique identifiers (v4 and v7)
Optional (feature-gated):
- **rdkafka**: Kafka support
- **async-nats**: NATS support
- **lapin**: AMQP support
- **rumqttc**: MQTT support
- **mongodb**: MongoDB support
- **actix/reqwest**: HTTP support
- **sled**: Deduplication storage
- **metrics**: Metrics collection