shove 0.11.5

Async tasks via pubsub on steroids. Comes with built-in support for complex queue configurations, audit logs, autoscaling consumer groups and more.
Documentation
//! Basic pub/sub examples covering all non-sequenced topology configurations (SQS backend).
//!
//! Demonstrates: `define_topic!`, all `Outcome` variants (`Ack`, `Retry`, `Reject`),
//! `publish`, `publish_with_headers`, `publish_batch`, and per-topic consumer
//! registration via `Broker<Sqs>::consumer_supervisor()`.
//!
//! Note: the earlier DLQ-consumer demo (`run_dlq::<DlqOrder>`) isn't wired
//! through the generic `ConsumerSupervisor<B>` wrapper yet — messages that
//! land in the DLQ here are simply left there for inspection.
//!
//! Spins up a LocalStack testcontainer automatically. Requires a running
//! Docker daemon and the `LOCALSTACK_AUTH_TOKEN` environment variable:
//!
//!     LOCALSTACK_AUTH_TOKEN=... cargo run --example sqs_basic_pubsub --features aws-sns-sqs

use std::collections::HashMap;
use std::time::Duration;

use serde::{Deserialize, Serialize};
use shove::sns::SnsConfig;
use shove::{
    Broker, ConsumerOptions, DeadMessageMetadata, MessageHandler, MessageMetadata, Outcome, Sqs,
    TopologyBuilder, define_topic,
};
use testcontainers::ImageExt;
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::localstack::LocalStack;

// ─── Message type ───────────────────────────────────────────────────────────

#[derive(Debug, Clone, Serialize, Deserialize)]
struct OrderEvent {
    order_id: String,
    amount_cents: u64,
}

// ─── Topic definitions ──────────────────────────────────────────────────────

// 1. Minimal: just a queue. No DLQ, no hold queues.
//    Rejected messages are discarded with a warning log.
define_topic!(
    MinimalOrder,
    OrderEvent,
    TopologyBuilder::new("sqs-minimal-orders").build()
);

// 2. With DLQ: rejected messages land in a dead-letter queue for inspection.
define_topic!(
    DlqOrder,
    OrderEvent,
    TopologyBuilder::new("sqs-dlq-orders").dlq().build()
);

// 3. With hold queues + DLQ: escalating retry backoff (5 s → 30 s → 120 s).
//    Messages that exceed max_retries are sent to DLQ.
define_topic!(
    RetryOrder,
    OrderEvent,
    TopologyBuilder::new("sqs-retry-orders")
        .hold_queue(Duration::from_secs(5))
        .hold_queue(Duration::from_secs(30))
        .hold_queue(Duration::from_secs(120))
        .dlq()
        .build()
);

// ─── Handlers ───────────────────────────────────────────────────────────────

// Acks every message.
struct AckHandler;

impl MessageHandler<MinimalOrder> for AckHandler {
    type Context = ();
    async fn handle(&self, msg: OrderEvent, metadata: MessageMetadata, _: &()) -> Outcome {
        println!(
            "[minimal] order={} amount=${:.2} attempt={}",
            msg.order_id,
            msg.amount_cents as f64 / 100.0,
            metadata.retry_count + 1,
        );
        Outcome::Ack
    }
}

// Rejects every message. Also implements handle_dead for DLQ processing.
struct RejectHandler;

impl MessageHandler<DlqOrder> for RejectHandler {
    type Context = ();
    async fn handle(&self, msg: OrderEvent, _metadata: MessageMetadata, _: &()) -> Outcome {
        println!("[dlq] rejecting order={} → DLQ", msg.order_id);
        Outcome::Reject
    }

    async fn handle_dead(&self, msg: OrderEvent, metadata: DeadMessageMetadata, _: &()) {
        println!(
            "[dlq] dead-letter: order={} reason={} deaths={}",
            msg.order_id,
            metadata.reason.as_deref().unwrap_or("unknown"),
            metadata.death_count,
        );
    }
}

// Retries once (simulates transient failure), then acks.
struct RetryHandler;

impl MessageHandler<RetryOrder> for RetryHandler {
    type Context = ();
    async fn handle(&self, msg: OrderEvent, metadata: MessageMetadata, _: &()) -> Outcome {
        println!(
            "[retry] order={} attempt={}",
            msg.order_id,
            metadata.retry_count + 1,
        );
        if metadata.retry_count == 0 {
            println!("[retry]   → transient failure, will Retry");
            Outcome::Retry
        } else {
            println!("[retry]   → success on retry");
            Outcome::Ack
        }
    }
}

// ─── Main ───────────────────────────────────────────────────────────────────

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let auth_token = match std::env::var("LOCALSTACK_AUTH_TOKEN") {
        Ok(t) => t,
        Err(_) => {
            eprintln!(
                "LOCALSTACK_AUTH_TOKEN is not set. This example requires a LocalStack Pro auth \
                 token:\n\n    export LOCALSTACK_AUTH_TOKEN=...\n"
            );
            std::process::exit(1);
        }
    };

    // Dummy AWS credentials for LocalStack.
    // SAFETY: called before any concurrent env access in this process.
    unsafe {
        std::env::set_var("AWS_ACCESS_KEY_ID", "test");
        std::env::set_var("AWS_SECRET_ACCESS_KEY", "test");
        std::env::set_var("AWS_REGION", "us-east-1");
    }

    let container = LocalStack::default()
        .with_env_var("LOCALSTACK_AUTH_TOKEN", auth_token)
        .start()
        .await?;
    let port = container.get_host_port_ipv4(4566).await?;
    let endpoint = format!("http://localhost:{port}");

    // [!region connect]
    let broker = Broker::<Sqs>::new(SnsConfig {
        region: Some("us-east-1".into()),
        endpoint_url: Some(endpoint),
    })
    .await?;
    // [!endregion connect]

    // ── Declare all topologies ──
    // [!region declare]
    let topology = broker.topology();
    topology.declare::<MinimalOrder>().await?;
    topology.declare::<DlqOrder>().await?;
    topology.declare::<RetryOrder>().await?;
    // [!endregion declare]
    println!("topologies declared\n");

    // ── Publish ──
    // [!region publish]
    let publisher = broker.publisher().await?;

    // Single publish
    let order = OrderEvent {
        order_id: "ORD-001".into(),
        amount_cents: 5000,
    };
    publisher.publish::<MinimalOrder>(&order).await?;
    publisher.publish::<DlqOrder>(&order).await?;
    publisher.publish::<RetryOrder>(&order).await?;

    // Publish with custom headers
    let mut headers = HashMap::new();
    headers.insert("x-source".into(), "example".into());
    headers.insert("x-priority".into(), "high".into());
    let order2 = OrderEvent {
        order_id: "ORD-002".into(),
        amount_cents: 9900,
    };
    publisher
        .publish_with_headers::<MinimalOrder>(&order2, headers)
        .await?;

    // Batch publish
    let batch = vec![
        OrderEvent {
            order_id: "ORD-003".into(),
            amount_cents: 1000,
        },
        OrderEvent {
            order_id: "ORD-004".into(),
            amount_cents: 2500,
        },
        OrderEvent {
            order_id: "ORD-005".into(),
            amount_cents: 7777,
        },
    ];
    publisher.publish_batch::<MinimalOrder>(&batch).await?;
    // [!endregion publish]
    println!("messages published\n");

    // ── Start consumers ──
    // [!region consume]
    let mut supervisor = broker.consumer_supervisor();
    supervisor.register::<MinimalOrder, _>(AckHandler, ConsumerOptions::<Sqs>::new())?;
    supervisor.register::<DlqOrder, _>(RejectHandler, ConsumerOptions::<Sqs>::new())?;
    supervisor.register::<RetryOrder, _>(
        RetryHandler,
        ConsumerOptions::<Sqs>::new().with_max_retries(3),
    )?;

    // Let everything run, then shut down.
    let outcome = supervisor
        .run_until_timeout(
            async {
                tokio::select! {
                    _ = tokio::time::sleep(Duration::from_secs(10)) => {}
                    _ = tokio::signal::ctrl_c() => {}
                }
            },
            Duration::from_secs(5),
        )
        .await;
    // [!endregion consume]

    println!("done");
    std::process::exit(outcome.exit_code());
}