# queuerious-lapin
[Queuerious](https://queuerious.dev) SDK adapter for [lapin](https://crates.io/crates/lapin) (RabbitMQ).
Wraps `lapin::Consumer` to automatically report job lifecycle events (started, completed, failed, retrying) to the Queuerious platform — no manual instrumentation required.
## Installation
```toml
[dependencies]
queuerious = "0.1"
queuerious-lapin = "0.1"
```
### Feature flags
| _(none)_ | Lifecycle event tracking only | `TrackedConsumer`, `TrackedDelivery` |
| `metrics` | Queue metrics collection + agent | Counters, metrics collector, `ObservabilityRuntime` |
| `commands` | Remote command execution + agent | `RabbitMqCommandExecutor`, `ObservabilityRuntime` |
| `agent` | Convenience — enables `metrics` + `commands` | Everything above |
| `management` | RabbitMQ Management HTTP API (Tier 2 metrics) | Requires `metrics`; adds publish/deliver/ack rates, memory, unacked count |
Features can be combined freely. Enabling `metrics` or `commands` (or both) unlocks the `ObservabilityRuntime` which wires up the agent, collectors, and executors in one call.
## Quick start — lifecycle tracking only
No feature flags needed. Wrap your `lapin::Consumer` in a `TrackedConsumer` to automatically report job lifecycle events to Queuerious.
```rust
use queuerious::QueuriousClient;
use queuerious_lapin::TrackedConsumer;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(
QueuriousClient::builder()
.api_key("qk_your_key")
.endpoint("https://api.queuerious.dev")
.build()?
);
let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
let channel = conn.create_channel().await?;
let consumer = channel.basic_consume(
"my-queue", "my-consumer",
lapin::options::BasicConsumeOptions::default(),
lapin::types::FieldTable::default(),
).await?;
let mut tracked = TrackedConsumer::new(consumer, "my-queue", client);
while let Some(delivery) = tracked.next_tracked().await {
let delivery = delivery?;
// Process message...
delivery.ack(lapin::options::BasicAckOptions::default()).await?;
}
Ok(())
}
```
## Metrics only
```toml
queuerious-lapin = { version = "0.1", features = ["metrics"] }
```
Enables queue metrics collection (throughput, success rate, processing latency, message depth) and spawns an agent that pushes them to Queuerious. No remote command execution.
```rust
use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(
QueuriousClient::builder()
.api_key("qk_your_key")
.build()?
);
let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));
// Start the runtime — spawns an agent that collects and pushes metrics.
let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
.queue_names(vec!["emails".into(), "payments".into()])
.window_seconds(60.0) // sliding window for rate calculations
.start()
.await?;
// Create consumers through the runtime — counters are auto-attached.
let consumer = conn.create_channel().await?
.basic_consume("emails", "emails-consumer", Default::default(), Default::default())
.await?;
let mut tracked = runtime.tracked_consumer(consumer, "emails");
while let Some(delivery) = tracked.next_tracked().await {
let delivery = delivery?;
// Process message...
// Throughput, success rate, and latency are automatically recorded.
delivery.ack(lapin::options::BasicAckOptions::default()).await?;
}
runtime.shutdown().await?;
Ok(())
}
```
### Three-tier metrics system
| 0 | `TrackedDelivery` event counters | throughput, success_rate, avg_processing_ms, error_rate | `metrics` feature |
| 1 | AMQP `queue_declare(passive=true)` | message_count, consumer_count | `metrics` feature |
| 2 | RabbitMQ Management HTTP API | publish_rate, deliver_rate, ack_rate, memory_bytes, unacked_count | `metrics` + `management` features |
To enable Tier 2:
```toml
queuerious-lapin = { version = "0.1", features = ["metrics", "management"] }
```
```rust
use queuerious_lapin::metrics::ManagementApiConfig;
let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
.queue_names(vec!["emails".into()])
.management_api(ManagementApiConfig {
base_url: "http://localhost:15672".into(),
username: "guest".into(),
password: "guest".into(),
vhost: "%2F".into(),
})
.start()
.await?;
```
## Commands only
```toml
queuerious-lapin = { version = "0.1", features = ["commands"] }
```
Enables remote command execution (pause, resume, purge, retry) and spawns an agent that polls for commands from the Queuerious dashboard. No metrics collection.
```rust
use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(
QueuriousClient::builder()
.api_key("qk_your_key")
.build()?
);
let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));
// Start the runtime — spawns an agent that polls for and executes commands.
let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
.prefetch_count(10) // restored on resume after a pause command
.start()
.await?;
// Supported commands (issued from the Queuerious dashboard):
// - pause: sets prefetch_count to 0, stopping message delivery
// - resume: restores original prefetch_count
// - purge: removes all messages from a queue
// - retry: re-publishes a message to a queue
// ... run your application ...
runtime.shutdown().await?;
Ok(())
}
```
## Full agent — metrics + commands
```toml
queuerious-lapin = { version = "0.1", features = ["agent"] }
# or equivalently:
queuerious-lapin = { version = "0.1", features = ["metrics", "commands"] }
```
Enables both metrics collection and remote command execution.
```rust
use queuerious::QueuriousClient;
use queuerious_lapin::{ObservabilityRuntime, StaticConnectionFactory};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(
QueuriousClient::builder()
.api_key("qk_your_key")
.build()?
);
let conn = lapin::Connection::connect("amqp://localhost:5672", Default::default()).await?;
let channel_factory = Arc::new(StaticConnectionFactory::new(conn.clone()));
let runtime = ObservabilityRuntime::builder(client.clone(), channel_factory)
.queue_names(vec!["emails".into(), "payments".into()])
.prefetch_count(10)
.start()
.await?;
// Consumers get counters auto-attached for metrics.
let consumer = conn.create_channel().await?
.basic_consume("emails", "emails-consumer", Default::default(), Default::default())
.await?;
let mut tracked = runtime.tracked_consumer(consumer, "emails");
while let Some(delivery) = tracked.next_tracked().await {
let delivery = delivery?;
delivery.ack(lapin::options::BasicAckOptions::default()).await?;
}
runtime.shutdown().await?;
Ok(())
}
```
## Manual setup (without `ObservabilityRuntime`)
If you need more control, you can wire the components yourself instead of using the runtime:
```rust
use queuerious::{QueuriousClient, Agent, AgentConfig};
use queuerious_lapin::TrackedConsumer;
use queuerious_lapin::metrics::{RabbitMqMetricsCollector, SlidingWindowCounters};
use queuerious_lapin::RabbitMqCommandExecutor;
use std::collections::HashMap;
use std::sync::Arc;
// 1. Create shared counters for each queue
let mut counters = HashMap::new();
counters.insert("emails".into(), SlidingWindowCounters::new());
counters.insert("payments".into(), SlidingWindowCounters::new());
let counters = Arc::new(counters);
// 2. Build the metrics collector (channel factory enables auto-reconnect)
let channel_factory: Arc<dyn queuerious_lapin::ChannelFactory> = Arc::new(
queuerious_lapin::StaticConnectionFactory::new(connection)
);
let collector = RabbitMqMetricsCollector::builder(counters.clone(), channel_factory)
.window_seconds(60.0)
.build();
// 3. Build the command executor (uses channel factory for auto-reconnect)
let executor = RabbitMqCommandExecutor::new(channel_factory.clone(), 10);
// 4. Build and spawn the agent
let agent = client.agent_builder()
.metrics_collector(Arc::new(collector))
.command_executor(Arc::new(executor))
.build()?;
let (handle, shutdown_tx) = agent.spawn();
// 5. Attach counters to each tracked consumer
let mut tracked = TrackedConsumer::new(consumer, "emails", client.clone())
.with_counters(counters.clone());
```
## License
Licensed under either of
- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT license ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>)
at your option.