amqp-bridge 0.2.12

AMQP bridge
Documentation
# AMQP Message Bridge

A service that bridges messages between two AMQP instances with automatic reconnection and health checks.
Also available as an extendable library (crate) so other codebases can plug in a custom message transformer and run the bridge.

## Features

- ✅ Automatic reconnection with exponential backoff
- ✅ Continuous recovery loop (5s interval)
- ✅ Health check endpoints for monitoring
- ✅ Python-compatible JSON logging
- ✅ At-least-once delivery guarantee
- ✅ Zero message loss with publisher confirmations
- ✅ Extensible: plug in your own async message transformer

## Quick Start

### Prerequisites

- Rust 1.90+ (2024 edition)
- Two AMQP instances

### Installation

```bash
git clone https://github.com/bixority/amqp-bridge
cd amqp-bridge
cargo build --release
```

Alternatively, build a static Linux binary via Makefile (musl target):

```bash
make release
# output: target/amqp-bridge
```

### Configuration

Create a `.env` file:

```bash
# Required
SOURCE_DSN=amqp://user:password@source-host:5672/%2f
TARGET_DSN=amqp://user:password@target-host:5672/%2f

# Optional (with defaults)
SOURCE_QUEUE=old
TARGET_EXCHANGE=new_xchg
TARGET_ROUTING_KEY=update
HEALTH_PORT=8080
LOG_FORMAT=json  # or 'pretty'
RUST_LOG=info    # trace, debug, info, warn, error
```

### Run

```bash
cargo run --release
# or
./target/release/amqp-bridge
```

## Health Endpoints

- `GET /healthz` - Liveness probe
- `GET /ready` - Readiness probe
- `GET /startup` - Startup probe

Health states: `Starting` → `Healthy` / `Unhealthy`

## Logging Formats

### JSON (Production)
```bash
LOG_FORMAT=json cargo run
```
Python-compatible structured logs for log aggregation systems.

### Pretty (Development)
```bash
LOG_FORMAT=pretty cargo run
```
Human-readable colored output.

## Message Flow

```
Source Queue → Message Bridge → Target Exchange
   Consume → Publish → Confirm → Acknowledge
```

**Guarantee**: Messages are acknowledged only after successful publish confirmation. Failed messages are nack'd and requeued.

## Error Handling

### Connection Categories
- `connection_refused` - Service not running or firewall blocking
- `access_refused` - Authentication failures
- `timeout` - No response from server
- `dns_resolution` - Hostname resolution failed

### Recovery Behavior
1. Initial connection: 10 retries with exponential backoff (1s → 30s)
2. Connection loss: Reconnect every 5 seconds
3. Consumer errors: Mark unhealthy, trigger reconnection
4. Publish failures: Nack with requeue

## Podman

```bash
# Build
podman build -t amqp-bridge .

# Run
podman run -d \
  -e SOURCE_DSN="amqp://user:pass@source:5672/%2f" \
  -e TARGET_DSN="amqp://user:pass@target:5672/%2f" \
  -p 8080:8080 \
  amqp-bridge
```

### Compose file

```bash
podman compose --env-file .env up --build --remove-orphans
```

## Project Structure

```
src/
├── main.rs      # Entry point; delegates to recovery runner
├── bridge.rs    # Message bridging logic
├── conf.rs      # Configuration
├── health.rs    # Health endpoints
├── logging.rs   # Logging setup
└── transform.rs # Transformer trait and helper types

## Using as a Library (Extendable Crate)

You can depend on this crate and provide your own message transformation logic.

Implement a transformer and run the bridge:

```rust
use std::sync::Arc;
use anyhow::Result;
use amqp_bridge::{
    Config,
    HealthState,
    MessageBridge,
    MessageTransformer,
    Message,
};
use async_trait::async_trait;
use tokio::sync::RwLock;

struct MyTransformer;

#[async_trait]
impl MessageTransformer for MyTransformer {
    async fn transform(&self, input: Message) -> Result<Message> {
        // Example: uppercase the body if it's UTF-8
        let data = match String::from_utf8(input.data) {
            Ok(s) => s.to_uppercase().into_bytes(),
            Err(e) => e.into_bytes(),
        };
        Ok(Message { data, properties: input.properties })
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Load config from env the same way the binary does
    let config = Config::from_env()?;
    let health_state = Arc::new(RwLock::new(HealthState::default()));

    // Create bridge with optional transformer and run
    let transformer: Arc<dyn MessageTransformer> = Arc::new(MyTransformer);
    let bridge = MessageBridge::new(
        config.clone(),
        health_state.clone(),
        Some(transformer),
    ).await?;

    bridge.run().await?;
    Ok(())
}
```

Alternatively, use the convenience runners with auto-recovery and Ctrl+C handling:

```rust
use std::sync::Arc;
use amqp_bridge::{
    Config,
    HealthState,
    run_with_ctrl_c,
    MessageTransformer,
};
use tokio::sync::RwLock;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let config = Config::from_env()?;
    let health_state = Arc::new(RwLock::new(HealthState::default()));
    let transformer: Arc<dyn MessageTransformer> = Arc::new(MyTransformer);
    // Pass Some(transformer) to enable transformation
    run_with_ctrl_c(config, health_state, Some(transformer)).await
}
```

Notes:
- The transformer runs for each consumed message before it is published to the target.
- On transformer error, the message will be nack'ed with requeue to avoid loss.

Add this crate to your project (if not using a local path), for example via Git:

```toml
[dependencies]
amqp-bridge = { git = "https://github.com/bixority/amqp-bridge" }
```

To run with pass-through behavior (no transform), just don't pass a transformer:

```rust
use amqp_bridge::{Config, HealthState, run_with_ctrl_c};
use tokio::sync::RwLock;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let config = Config::from_env()?;
    let health_state = Arc::new(RwLock::new(HealthState::default()));
    // None means: do not transform; forward as-is
    run_with_ctrl_c(config, health_state, None).await
}
```

## Development

```bash
# Build
cargo build --release

# Test
cargo test

# Debug logging
RUST_LOG=debug cargo run
```

## Networking & Health Server

- The health server binds to 0.0.0.0:HEALTH_PORT (default 8080), exposing:
  - GET /healthz (liveness)
  - GET /ready (readiness)
  - GET /startup (startup)

This makes it suitable for container/Kubernetes probes out of the box.

## Troubleshooting

Check logs for error categories and hints:
- **connection_refused**: Verify AMQP is running and port is accessible
- **access_refused**: Check credentials and permissions
- **timeout**: Verify network connectivity
- **dns_resolution**: Check hostname or use IP address

Messages not appearing? Check:
1. Source queue has messages
2. Logs show `event="message_received"`
3. Target exchange exists
4. Routing key matches queue bindings

## Performance

- **QoS**: 1 (processes one message at a time)
- **Memory**: Minimal footprint
- **Recovery**: 5-second reconnection interval
- **Security**: Credentials sanitized in logs