queuerious-lapin 0.1.2

Queuerious SDK adapter for lapin (RabbitMQ)
Documentation

queuerious-lapin

Queuerious SDK adapter for lapin (RabbitMQ).

Wraps lapin::Consumer to automatically report job lifecycle events (started, completed, failed, retrying) to the Queuerious platform — no manual instrumentation required.

Installation

[dependencies]
queuerious = "0.1"
queuerious-lapin = "0.1"

Feature flags

Feature Description What you get
(none) Lifecycle event tracking only TrackedConsumer, TrackedDelivery
metrics Queue metrics collection + agent Counters, metrics collector, ObservabilityRuntime
commands Remote command execution + agent RabbitMqCommandExecutor, ObservabilityRuntime
agent Convenience — enables metrics + commands Everything above
management RabbitMQ Management HTTP API (Tier 2 metrics) Requires metrics; adds publish/deliver/ack rates, memory, unacked count

Features can be combined freely. Enabling metrics or commands (or both) unlocks the ObservabilityRuntime which wires up the agent, collectors, and executors in one call.

Quick start — lifecycle tracking only

No feature flags needed. Wrap your lapin::Consumer in a TrackedConsumer to automatically report job lifecycle events to Queuerious.

use queuerious::QueuriousClient;
use queuerious_lapin::TrackedConsumer;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(
        QueuriousClient::builder()
            .api_key("qk_your_key")
            .endpoint("https://api.queuerious.dev")
            .build()?
    );

    let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
    let channel = conn.create_channel().await?;

    let consumer = channel.basic_consume(
        "my-queue", "my-consumer",
        lapin::options::BasicConsumeOptions::default(),
        lapin::types::FieldTable::default(),
    ).await?;

    let mut tracked = TrackedConsumer::new(consumer, "my-queue", client);

    while let Some(delivery) = tracked.next_tracked().await {
        let delivery = delivery?;
        // Process message...
        delivery.ack(lapin::options::BasicAckOptions::default()).await?;
    }

    Ok(())
}

Metrics only

queuerious-lapin = { version = "0.1", features = ["metrics"] }

Enables queue metrics collection (throughput, success rate, processing latency, message depth) and spawns an agent that pushes them to Queuerious. No remote command execution.

use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(
        QueuriousClient::builder()
            .api_key("qk_your_key")
            .build()?
    );

    let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
    let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));

    // Start the runtime — spawns an agent that collects and pushes metrics.
    let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
        .queue_names(vec!["emails".into(), "payments".into()])
        .window_seconds(60.0) // sliding window for rate calculations
        .start()
        .await?;

    // Create consumers through the runtime — counters are auto-attached.
    let consumer = conn.create_channel().await?
        .basic_consume("emails", "emails-consumer", Default::default(), Default::default())
        .await?;
    let mut tracked = runtime.tracked_consumer(consumer, "emails");

    while let Some(delivery) = tracked.next_tracked().await {
        let delivery = delivery?;
        // Process message...
        // Throughput, success rate, and latency are automatically recorded.
        delivery.ack(lapin::options::BasicAckOptions::default()).await?;
    }

    runtime.shutdown().await?;
    Ok(())
}

Three-tier metrics system

Tier Source Data Requires
0 TrackedDelivery event counters throughput, success_rate, avg_processing_ms, error_rate metrics feature
1 AMQP queue_declare(passive=true) message_count, consumer_count metrics feature
2 RabbitMQ Management HTTP API publish_rate, deliver_rate, ack_rate, memory_bytes, unacked_count metrics + management features

To enable Tier 2:

queuerious-lapin = { version = "0.1", features = ["metrics", "management"] }
use queuerious_lapin::metrics::ManagementApiConfig;

let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
    .queue_names(vec!["emails".into()])
    .management_api(ManagementApiConfig {
        base_url: "http://localhost:15672".into(),
        username: "guest".into(),
        password: "guest".into(),
        vhost: "%2F".into(),
    })
    .start()
    .await?;

Commands only

queuerious-lapin = { version = "0.1", features = ["commands"] }

Enables remote command execution (pause, resume, purge, retry) and spawns an agent that polls for commands from the Queuerious dashboard. No metrics collection.

use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(
        QueuriousClient::builder()
            .api_key("qk_your_key")
            .build()?
    );

    let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
    let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));

    // Start the runtime — spawns an agent that polls for and executes commands.
    let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
        .prefetch_count(10) // restored on resume after a pause command
        .start()
        .await?;

    // Supported commands (issued from the Queuerious dashboard):
    // - pause:  sets prefetch_count to 0, stopping message delivery
    // - resume: restores original prefetch_count
    // - purge:  removes all messages from a queue
    // - retry:  re-publishes a message to a queue

    // ... run your application ...

    runtime.shutdown().await?;
    Ok(())
}

Full agent — metrics + commands

queuerious-lapin = { version = "0.1", features = ["agent"] }
# or equivalently:
queuerious-lapin = { version = "0.1", features = ["metrics", "commands"] }

Enables both metrics collection and remote command execution.

use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(
        QueuriousClient::builder()
            .api_key("qk_your_key")
            .build()?
    );

    let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
    let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));

    let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
        .queue_names(vec!["emails".into(), "payments".into()])
        .prefetch_count(10)
        .start()
        .await?;

    // Consumers get counters auto-attached for metrics.
    let consumer = conn.create_channel().await?
        .basic_consume("emails", "emails-consumer", Default::default(), Default::default())
        .await?;
    let mut tracked = runtime.tracked_consumer(consumer, "emails");

    while let Some(delivery) = tracked.next_tracked().await {
        let delivery = delivery?;
        delivery.ack(lapin::options::BasicAckOptions::default()).await?;
    }

    runtime.shutdown().await?;
    Ok(())
}

Manual setup (without ObservabilityRuntime)

If you need more control, you can wire the components yourself instead of using the runtime:

use queuerious::{QueuriousClient, Agent, AgentConfig};
use queuerious_lapin::TrackedConsumer;
use queuerious_lapin::metrics::{RabbitMqMetricsCollector, SlidingWindowCounters};
use queuerious_lapin::RabbitMqCommandExecutor;
use std::collections::HashMap;
use std::sync::Arc;

// 1. Create shared counters for each queue
let mut counters = HashMap::new();
counters.insert("emails".into(), SlidingWindowCounters::new());
counters.insert("payments".into(), SlidingWindowCounters::new());
let counters = Arc::new(counters);

// 2. Build the metrics collector (channel factory enables auto-reconnect)
let channel_factory: Arc<dyn queuerious_lapin::ChannelFactory> = Arc::new(
    queuerious_lapin::StaticConnectionFactory::new(connection)
);
let collector = RabbitMqMetricsCollector::builder(counters.clone(), channel_factory)
    .window_seconds(60.0)
    .build();

// 3. Build the command executor (uses channel factory for auto-reconnect)
let executor = RabbitMqCommandExecutor::new(channel_factory.clone(), 10);

// 4. Build and spawn the agent
let agent = client.agent_builder()
    .metrics_collector(Arc::new(collector))
    .command_executor(Arc::new(executor))
    .build()?;
let (handle, shutdown_tx) = agent.spawn();

// 5. Attach counters to each tracked consumer
let mut tracked = TrackedConsumer::new(consumer, "emails", client.clone())
    .with_counters(counters.clone());

License

Licensed under either of

at your option.