queuerious-lapin
Queuerious SDK adapter for lapin (RabbitMQ).
Wraps lapin::Consumer to automatically report job lifecycle events (started, completed, failed, retrying) to the Queuerious platform — no manual instrumentation required.
Installation
[dependencies]
queuerious = "0.1"
queuerious-lapin = "0.1"
Feature flags
| Feature |
Description |
What you get |
| (none) |
Lifecycle event tracking only |
TrackedConsumer, TrackedDelivery |
metrics |
Queue metrics collection + agent |
Counters, metrics collector, ObservabilityRuntime |
commands |
Remote command execution + agent |
RabbitMqCommandExecutor, ObservabilityRuntime |
agent |
Convenience — enables metrics + commands |
Everything above |
management |
RabbitMQ Management HTTP API (Tier 2 metrics) |
Requires metrics; adds publish/deliver/ack rates, memory, unacked count |
Features can be combined freely. Enabling metrics or commands (or both) unlocks the ObservabilityRuntime which wires up the agent, collectors, and executors in one call.
Quick start — lifecycle tracking only
No feature flags needed. Wrap your lapin::Consumer in a TrackedConsumer to automatically report job lifecycle events to Queuerious.
use queuerious::QueuriousClient;
use queuerious_lapin::TrackedConsumer;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(
QueuriousClient::builder()
.api_key("qk_your_key")
.endpoint("https://api.queuerious.dev")
.build()?
);
let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
let channel = conn.create_channel().await?;
let consumer = channel.basic_consume(
"my-queue", "my-consumer",
lapin::options::BasicConsumeOptions::default(),
lapin::types::FieldTable::default(),
).await?;
let mut tracked = TrackedConsumer::new(consumer, "my-queue", client);
while let Some(delivery) = tracked.next_tracked().await {
let delivery = delivery?;
delivery.ack(lapin::options::BasicAckOptions::default()).await?;
}
Ok(())
}
Metrics only
queuerious-lapin = { version = "0.1", features = ["metrics"] }
Enables queue metrics collection (throughput, success rate, processing latency, message depth) and spawns an agent that pushes them to Queuerious. No remote command execution.
use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(
QueuriousClient::builder()
.api_key("qk_your_key")
.build()?
);
let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));
let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
.queue_names(vec!["emails".into(), "payments".into()])
.window_seconds(60.0) .start()
.await?;
let consumer = conn.create_channel().await?
.basic_consume("emails", "emails-consumer", Default::default(), Default::default())
.await?;
let mut tracked = runtime.tracked_consumer(consumer, "emails");
while let Some(delivery) = tracked.next_tracked().await {
let delivery = delivery?;
delivery.ack(lapin::options::BasicAckOptions::default()).await?;
}
runtime.shutdown().await?;
Ok(())
}
Three-tier metrics system
| Tier |
Source |
Data |
Requires |
| 0 |
TrackedDelivery event counters |
throughput, success_rate, avg_processing_ms, error_rate |
metrics feature |
| 1 |
AMQP queue_declare(passive=true) |
message_count, consumer_count |
metrics feature |
| 2 |
RabbitMQ Management HTTP API |
publish_rate, deliver_rate, ack_rate, memory_bytes, unacked_count |
metrics + management features |
To enable Tier 2:
queuerious-lapin = { version = "0.1", features = ["metrics", "management"] }
use queuerious_lapin::metrics::ManagementApiConfig;
let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
.queue_names(vec!["emails".into()])
.management_api(ManagementApiConfig {
base_url: "http://localhost:15672".into(),
username: "guest".into(),
password: "guest".into(),
vhost: "%2F".into(),
})
.start()
.await?;
Commands only
queuerious-lapin = { version = "0.1", features = ["commands"] }
Enables remote command execution (pause, resume, purge, retry) and spawns an agent that polls for commands from the Queuerious dashboard. No metrics collection.
use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(
QueuriousClient::builder()
.api_key("qk_your_key")
.build()?
);
let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));
let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
.prefetch_count(10) .start()
.await?;
runtime.shutdown().await?;
Ok(())
}
Full agent — metrics + commands
queuerious-lapin = { version = "0.1", features = ["agent"] }
queuerious-lapin = { version = "0.1", features = ["metrics", "commands"] }
Enables both metrics collection and remote command execution.
use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(
QueuriousClient::builder()
.api_key("qk_your_key")
.build()?
);
let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));
let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
.queue_names(vec!["emails".into(), "payments".into()])
.prefetch_count(10)
.start()
.await?;
let consumer = conn.create_channel().await?
.basic_consume("emails", "emails-consumer", Default::default(), Default::default())
.await?;
let mut tracked = runtime.tracked_consumer(consumer, "emails");
while let Some(delivery) = tracked.next_tracked().await {
let delivery = delivery?;
delivery.ack(lapin::options::BasicAckOptions::default()).await?;
}
runtime.shutdown().await?;
Ok(())
}
Manual setup (without ObservabilityRuntime)
If you need more control, you can wire the components yourself instead of using the runtime:
use queuerious::{QueuriousClient, Agent, AgentConfig};
use queuerious_lapin::TrackedConsumer;
use queuerious_lapin::metrics::{RabbitMqMetricsCollector, SlidingWindowCounters};
use queuerious_lapin::RabbitMqCommandExecutor;
use std::collections::HashMap;
use std::sync::Arc;
let mut counters = HashMap::new();
counters.insert("emails".into(), SlidingWindowCounters::new());
counters.insert("payments".into(), SlidingWindowCounters::new());
let counters = Arc::new(counters);
let channel_factory: Arc<dyn queuerious_lapin::ChannelFactory> = Arc::new(
queuerious_lapin::StaticConnectionFactory::new(connection)
);
let collector = RabbitMqMetricsCollector::builder(counters.clone(), channel_factory)
.window_seconds(60.0)
.build();
let executor = RabbitMqCommandExecutor::new(channel_factory.clone(), 10);
let agent = client.agent_builder()
.metrics_collector(Arc::new(collector))
.command_executor(Arc::new(executor))
.build()?;
let (handle, shutdown_tx) = agent.spawn();
let mut tracked = TrackedConsumer::new(consumer, "emails", client.clone())
.with_counters(counters.clone());
License
Licensed under either of
at your option.