use std::sync::Arc;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use shove::rabbitmq::{
ConsumerGroupRegistry, ManagementConfig, RabbitMqAutoscalerBackend, RabbitMqClient,
RabbitMqConfig, RabbitMqConsumerGroupConfig, RabbitMqPublisher,
};
use shove::{
AutoscalerConfig, MessageHandler, MessageMetadata, Outcome, Topic, TopologyBuilder,
define_topic,
};
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::rabbitmq::RabbitMq as RabbitMqImage;
use tokio::sync::Mutex;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TaskEvent {
task_id: String,
payload: String,
}
define_topic!(
WorkQueue,
TaskEvent,
TopologyBuilder::new("ex-work-queue")
.hold_queue(Duration::from_secs(5))
.dlq()
.build()
);
#[derive(Clone)]
struct TaskHandler;
impl MessageHandler<WorkQueue> for TaskHandler {
type Context = ();
async fn handle(&self, msg: TaskEvent, metadata: MessageMetadata, _: &()) -> Outcome {
println!(
"[worker] task={} attempt={}",
msg.task_id,
metadata.retry_count + 1,
);
tokio::time::sleep(Duration::from_millis(500)).await;
Outcome::Ack
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "shove=debug,consumer_groups=debug".parse().unwrap()),
)
.init();
let container = RabbitMqImage::default().start().await?;
let amqp_port = container.get_host_port_ipv4(5672).await?;
let mgmt_port = container.get_host_port_ipv4(15672).await?;
let uri = format!("amqp://guest:guest@localhost:{amqp_port}/%2f");
let config = RabbitMqConfig::new(&uri);
let client = RabbitMqClient::connect(&config).await?;
let publisher = RabbitMqPublisher::new(client.clone()).await?;
let burst_size = 100;
for i in 0..burst_size {
let event = TaskEvent {
task_id: format!("TASK-{i:03}"),
payload: format!("work item {i}"),
};
publisher.publish::<WorkQueue>(&event).await?;
}
println!("published {burst_size} tasks\n");
let mut registry = ConsumerGroupRegistry::new(client.clone());
registry
.register::<WorkQueue, TaskHandler>(
RabbitMqConsumerGroupConfig::new(1..=5) .with_prefetch_count(10) .with_max_retries(3),
|| TaskHandler, (), )
.await?;
registry.start_all();
println!("consumer group started (min_consumers=1)\n");
let registry = Arc::new(Mutex::new(registry));
let mgmt_config =
ManagementConfig::new(format!("http://localhost:{mgmt_port}"), "guest", "guest");
let mut autoscaler = RabbitMqAutoscalerBackend::autoscaler(
&mgmt_config,
registry.clone(),
AutoscalerConfig {
poll_interval: Duration::from_secs(2),
scale_up_multiplier: 1.5,
scale_down_multiplier: 0.3,
hysteresis_duration: Duration::from_secs(4),
cooldown_duration: Duration::from_secs(8),
},
)
.expect("failed to create autoscaler");
let shutdown = client.shutdown_token();
let s = shutdown.clone();
let autoscaler_task = tokio::spawn(async move {
autoscaler.run(s).await;
});
println!("autoscaler running — watch consumer count change\n");
for _ in 0..20 {
tokio::time::sleep(Duration::from_secs(3)).await;
let reg = registry.lock().await;
if let Some(group) = reg.groups().get(WorkQueue::topology().queue()) {
println!(
"[monitor] active_consumers={} queue={}",
group.active_consumers(),
group.queue(),
);
}
}
println!("\nshutting down...");
registry.lock().await.shutdown_all().await;
client.shutdown().await;
let _ = autoscaler_task.await;
println!("done");
drop(container);
Ok(())
}