shove 0.11.5

Async tasks via pubsub on steroids. Comes with built-in support for complex queue configurations, audit logs, autoscaling consumer groups and more.
Documentation
//! Consumer group example (SQS backend).
//!
//! Demonstrates: `SqsConsumerGroupRegistry`, `SqsConsumerGroupConfig`,
//! `SqsQueueStatsProvider`, and dynamic queue depth monitoring.
//!
//! Note: SQS has no broker-level coordinated-group primitive — the SQS
//! registry spawns independent poll workers. The generic `Broker<Sqs>`
//! deliberately exposes only a supervisor (see `Sqs`'s doctest), so this
//! example stays on the backend-specific `SqsConsumerGroupRegistry` path.
//!
//! Spins up a LocalStack testcontainer automatically. Requires a running
//! Docker daemon and the `LOCALSTACK_AUTH_TOKEN` environment variable:
//!
//!     LOCALSTACK_AUTH_TOKEN=... cargo run --example sqs_consumer_groups --features aws-sns-sqs

use std::sync::Arc;
use std::time::Duration;

use serde::{Deserialize, Serialize};
use shove::sns::*;
use shove::*;
use testcontainers::ImageExt;
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::localstack::LocalStack;
use tokio::sync::Mutex;

// ─── Message type ───────────────────────────────────────────────────────────

#[derive(Debug, Clone, Serialize, Deserialize)]
struct TaskEvent {
    task_id: String,
    payload: String,
}

// ─── Topic ──────────────────────────────────────────────────────────────────

define_topic!(
    WorkQueue,
    TaskEvent,
    TopologyBuilder::new("sqs-work-queue").dlq().build()
);

// ─── Handler ────────────────────────────────────────────────────────────────

// Handler must be Clone for SqsConsumerGroup (each spawned consumer gets a clone).
#[derive(Clone)]
struct TaskHandler;

impl MessageHandler<WorkQueue> for TaskHandler {
    type Context = ();
    async fn handle(&self, msg: TaskEvent, metadata: MessageMetadata, _: &()) -> Outcome {
        println!(
            "[worker] task={} attempt={}",
            msg.task_id,
            metadata.retry_count + 1,
        );
        // Simulate work
        tokio::time::sleep(Duration::from_millis(200)).await;
        Outcome::Ack
    }
}

// ─── Main ───────────────────────────────────────────────────────────────────

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "shove=debug,sqs_consumer_groups=debug".parse().unwrap()),
        )
        .init();

    let auth_token = match std::env::var("LOCALSTACK_AUTH_TOKEN") {
        Ok(t) => t,
        Err(_) => {
            eprintln!(
                "LOCALSTACK_AUTH_TOKEN is not set. This example requires a LocalStack Pro auth \
                 token:\n\n    export LOCALSTACK_AUTH_TOKEN=...\n"
            );
            std::process::exit(1);
        }
    };

    // SAFETY: called before any concurrent env access in this process.
    unsafe {
        std::env::set_var("AWS_ACCESS_KEY_ID", "test");
        std::env::set_var("AWS_SECRET_ACCESS_KEY", "test");
        std::env::set_var("AWS_REGION", "us-east-1");
    }

    let container = LocalStack::default()
        .with_env_var("LOCALSTACK_AUTH_TOKEN", auth_token)
        .start()
        .await?;
    let port = container.get_host_port_ipv4(4566).await?;
    let endpoint = format!("http://localhost:{port}");

    let config = SnsConfig {
        region: Some("us-east-1".into()),
        endpoint_url: Some(endpoint),
    };
    let client = SnsClient::new(&config).await?;

    // ── Publish an initial burst of tasks ──
    //
    // We declare topology manually here so the publisher can resolve the SNS ARN.
    // The declarer reads the client-owned topic/queue registries shared with
    // every publisher and consumer group built from the same client.
    let declarer = SnsTopologyDeclarer::new(client.clone());
    declarer.declare(WorkQueue::topology()).await?;

    let publisher = SnsPublisher::new(client.clone(), client.topic_registry().clone());
    let burst_size = 50;
    for i in 0..burst_size {
        let event = TaskEvent {
            task_id: format!("TASK-{i:03}"),
            payload: format!("work item {i}"),
        };
        publisher.publish::<WorkQueue>(&event).await?;
    }
    println!("published {burst_size} tasks\n");

    // ── Set up consumer group registry ──
    //
    // SqsConsumerGroupRegistry manages named groups of identical consumers.
    // It automatically declares the topology and starts consumers at their
    // minimum count. Each group reads from a single SQS queue and can be
    // scaled up/down manually or via a custom autoscaler.
    let mut registry = SqsConsumerGroupRegistry::new(client.clone());

    registry
        .register::<WorkQueue, TaskHandler>(
            SqsConsumerGroupConfig::new(1..=5) // min..=max consumers
                .with_prefetch_count(10) // messages per consumer
                .with_max_retries(3),
            || TaskHandler, // factory — called once per spawned consumer
            (),             // handler context (unit for this example)
        )
        .await?;

    // Start all groups at their minimum consumer count.
    registry.start_all();
    println!("consumer group started (min_consumers=1)\n");

    let registry = Arc::new(Mutex::new(registry));

    // ── Monitor queue depth using SqsQueueStatsProvider ──
    //
    // Poll queue attributes to observe the backlog draining.
    let stats_provider =
        SqsQueueStatsProvider::new(client.clone(), client.queue_registry().clone());

    println!("monitoring queue depth — watching backlog drain\n");

    for _ in 0..15 {
        tokio::time::sleep(Duration::from_secs(2)).await;

        match stats_provider
            .get_queue_stats(WorkQueue::topology().queue())
            .await
        {
            Ok(stats) => println!(
                "[monitor] messages_ready={} in_flight={}",
                stats.messages_ready, stats.messages_not_visible,
            ),
            Err(e) => eprintln!("[monitor] failed to fetch stats: {e}"),
        }
    }

    // ── Shutdown ──
    println!("\nshutting down...");
    registry.lock().await.shutdown_all().await;
    client.shutdown().await;
    println!("done");

    drop(container);
    Ok(())
}