queuerious-lapin 0.1.22

Queuerious SDK adapter for lapin (RabbitMQ)
Documentation
# queuerious-lapin

[Queuerious](https://queuerious.dev) SDK adapter for [lapin](https://crates.io/crates/lapin) (RabbitMQ).

Wraps `lapin::Consumer` to automatically report job lifecycle events (started, completed, failed, retrying) to the Queuerious platform — no manual instrumentation required.

## Installation

```toml
[dependencies]
queuerious = "0.1"
queuerious-lapin = "0.1"
```

### Feature flags

| Feature | Description | What you get |
|---------|-------------|-------------|
| _(none)_ | Lifecycle event tracking only | `TrackedConsumer`, `TrackedDelivery` |
| `metrics` | Queue metrics collection + agent | Counters, metrics collector, `ObservabilityRuntime` |
| `commands` | Remote command execution + agent | `RabbitMqCommandExecutor`, `ObservabilityRuntime` |
| `agent` | Convenience — enables `metrics` + `commands` | Everything above |
| `management` | RabbitMQ Management HTTP API (Tier 2 metrics) | Requires `metrics`; adds publish/deliver/ack rates, memory, unacked count |

Features can be combined freely. Enabling `metrics` or `commands` (or both) unlocks the `ObservabilityRuntime` which wires up the agent, collectors, and executors in one call.

## Quick start — lifecycle tracking only

No feature flags needed. Wrap your `lapin::Consumer` in a `TrackedConsumer` to automatically report job lifecycle events to Queuerious.

```rust
use queuerious::QueuriousClient;
use queuerious_lapin::TrackedConsumer;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(
        QueuriousClient::builder()
            .api_key("qk_your_key")
            .endpoint("https://api.queuerious.dev")
            .build()?
    );

    let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
    let channel = conn.create_channel().await?;

    let consumer = channel.basic_consume(
        "my-queue", "my-consumer",
        lapin::options::BasicConsumeOptions::default(),
        lapin::types::FieldTable::default(),
    ).await?;

    let mut tracked = TrackedConsumer::new(consumer, "my-queue", client);

    while let Some(delivery) = tracked.next_tracked().await {
        let delivery = delivery?;
        // Process message...
        delivery.ack(lapin::options::BasicAckOptions::default()).await?;
    }

    Ok(())
}
```

## Metrics only

```toml
queuerious-lapin = { version = "0.1", features = ["metrics"] }
```

Enables queue metrics collection (throughput, success rate, processing latency, message depth) and spawns an agent that pushes them to Queuerious. No remote command execution.

```rust
use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(
        QueuriousClient::builder()
            .api_key("qk_your_key")
            .build()?
    );

    let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
    let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));

    // Start the runtime — spawns an agent that collects and pushes metrics.
    let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
        .queue_names(vec!["emails".into(), "payments".into()])
        .window_seconds(60.0) // sliding window for rate calculations
        .start()
        .await?;

    // Create consumers through the runtime — counters are auto-attached.
    let consumer = conn.create_channel().await?
        .basic_consume("emails", "emails-consumer", Default::default(), Default::default())
        .await?;
    let mut tracked = runtime.tracked_consumer(consumer, "emails");

    while let Some(delivery) = tracked.next_tracked().await {
        let delivery = delivery?;
        // Process message...
        // Throughput, success rate, and latency are automatically recorded.
        delivery.ack(lapin::options::BasicAckOptions::default()).await?;
    }

    runtime.shutdown().await?;
    Ok(())
}
```

### Three-tier metrics system

| Tier | Source | Data | Requires |
|------|--------|------|----------|
| 0 | `TrackedDelivery` event counters | throughput, success_rate, avg_processing_ms, error_rate | `metrics` feature |
| 1 | AMQP `queue_declare(passive=true)` | message_count, consumer_count | `metrics` feature |
| 2 | RabbitMQ Management HTTP API | publish_rate, deliver_rate, ack_rate, memory_bytes, unacked_count | `metrics` + `management` features |

To enable Tier 2:

```toml
queuerious-lapin = { version = "0.1", features = ["metrics", "management"] }
```

```rust
use queuerious_lapin::metrics::ManagementApiConfig;

let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
    .queue_names(vec!["emails".into()])
    .management_api(ManagementApiConfig {
        base_url: "http://localhost:15672".into(),
        username: "guest".into(),
        password: "guest".into(),
        vhost: "%2F".into(),
    })
    .start()
    .await?;
```

## Commands only

```toml
queuerious-lapin = { version = "0.1", features = ["commands"] }
```

Enables remote command execution (pause, resume, purge, retry) and spawns an agent that polls for commands from the Queuerious dashboard. No metrics collection.

```rust
use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(
        QueuriousClient::builder()
            .api_key("qk_your_key")
            .build()?
    );

    let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
    let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));

    // Start the runtime — spawns an agent that polls for and executes commands.
    let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
        .prefetch_count(10) // restored on resume after a pause command
        .start()
        .await?;

    // Supported commands (issued from the Queuerious dashboard):
    // - pause:  sets prefetch_count to 0, stopping message delivery
    // - resume: restores original prefetch_count
    // - purge:  removes all messages from a queue
    // - retry:  re-publishes a message to a queue

    // ... run your application ...

    runtime.shutdown().await?;
    Ok(())
}
```

## Full agent — metrics + commands

```toml
queuerious-lapin = { version = "0.1", features = ["agent"] }
# or equivalently:
queuerious-lapin = { version = "0.1", features = ["metrics", "commands"] }
```

Enables both metrics collection and remote command execution.

```rust
use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(
        QueuriousClient::builder()
            .api_key("qk_your_key")
            .build()?
    );

    let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
    let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));

    let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
        .queue_names(vec!["emails".into(), "payments".into()])
        .prefetch_count(10)
        .start()
        .await?;

    // Consumers get counters auto-attached for metrics.
    let consumer = conn.create_channel().await?
        .basic_consume("emails", "emails-consumer", Default::default(), Default::default())
        .await?;
    let mut tracked = runtime.tracked_consumer(consumer, "emails");

    while let Some(delivery) = tracked.next_tracked().await {
        let delivery = delivery?;
        delivery.ack(lapin::options::BasicAckOptions::default()).await?;
    }

    runtime.shutdown().await?;
    Ok(())
}
```

## Manual setup (without `ObservabilityRuntime`)

If you need more control, you can wire the components yourself instead of using the runtime:

```rust
use queuerious::{QueuriousClient, Agent, AgentConfig};
use queuerious_lapin::TrackedConsumer;
use queuerious_lapin::metrics::{RabbitMqMetricsCollector, SlidingWindowCounters};
use queuerious_lapin::RabbitMqCommandExecutor;
use std::collections::HashMap;
use std::sync::Arc;

// 1. Create shared counters for each queue
let mut counters = HashMap::new();
counters.insert("emails".into(), SlidingWindowCounters::new());
counters.insert("payments".into(), SlidingWindowCounters::new());
let counters = Arc::new(counters);

// 2. Build the metrics collector (channel factory enables auto-reconnect)
let channel_factory: Arc<dyn queuerious_lapin::ChannelFactory> = Arc::new(
    queuerious_lapin::StaticConnectionFactory::new(connection)
);
let collector = RabbitMqMetricsCollector::builder(counters.clone(), channel_factory)
    .window_seconds(60.0)
    .build();

// 3. Build the command executor (uses channel factory for auto-reconnect)
let executor = RabbitMqCommandExecutor::new(channel_factory.clone(), 10);

// 4. Build and spawn the agent
let agent = client.agent_builder()
    .metrics_collector(Arc::new(collector))
    .command_executor(Arc::new(executor))
    .build()?;
let (handle, shutdown_tx) = agent.spawn();

// 5. Attach counters to each tracked consumer
let mut tracked = TrackedConsumer::new(consumer, "emails", client.clone())
    .with_counters(counters.clone());
```

## License

Licensed under either of

- Apache License, Version 2.0 ([LICENSE-APACHE]LICENSE-APACHE or <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT license ([LICENSE-MIT]LICENSE-MIT or <http://opensource.org/licenses/MIT>)

at your option.