shove 0.7.4

Async tasks via pubsub on steroids. Comes with built-in support for complex queue configurations, audit logs, autoscaling consumer groups and more.
Documentation
//! Sequenced (per-key FIFO) delivery with `SequenceFailure::FailAll`.

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;

use shove::inmemory::{
    InMemoryBroker, InMemoryConsumer, InMemoryPublisher, InMemoryTopologyDeclarer,
};
use shove::{
    Consumer, ConsumerOptions, MessageHandler, MessageMetadata, Outcome, Publisher,
    SequenceFailure, SequencedTopic, Topic, TopologyBuilder, declare_topic,
};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct LedgerEntry {
    account: String,
    seq: u64,
    amount: i64,
}

struct LedgerTopic;
impl Topic for LedgerTopic {
    type Message = LedgerEntry;
    fn topology() -> &'static shove::QueueTopology {
        static T: std::sync::OnceLock<shove::QueueTopology> = std::sync::OnceLock::new();
        T.get_or_init(|| {
            TopologyBuilder::new("ledger")
                .sequenced(SequenceFailure::FailAll)
                .routing_shards(4)
                .hold_queue(Duration::from_millis(50))
                .dlq()
                .build()
        })
    }
    const SEQUENCE_KEY_FN: Option<fn(&Self::Message) -> String> = Some(Self::sequence_key);
}
impl SequencedTopic for LedgerTopic {
    fn sequence_key(msg: &LedgerEntry) -> String {
        msg.account.clone()
    }
}

#[derive(Clone)]
struct Handler {
    acked: Arc<AtomicUsize>,
}
impl MessageHandler<LedgerTopic> for Handler {
    async fn handle(&self, msg: LedgerEntry, _: MessageMetadata) -> Outcome {
        println!(
            "applied entry account={} seq={} amount={:+}",
            msg.account, msg.seq, msg.amount
        );
        self.acked.fetch_add(1, Ordering::Relaxed);
        Outcome::Ack
    }
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let broker = InMemoryBroker::new();
    let declarer = InMemoryTopologyDeclarer::new(broker.clone());
    declare_topic::<LedgerTopic>(&declarer).await.unwrap();

    let acked = Arc::new(AtomicUsize::new(0));
    let handler = Handler {
        acked: acked.clone(),
    };

    let shutdown = CancellationToken::new();
    let consumer = InMemoryConsumer::new(broker.clone());
    let shutdown_for_task = shutdown.clone();
    let consume_handle = tokio::spawn(async move {
        let opts = ConsumerOptions::new(shutdown_for_task).with_prefetch_count(1);
        consumer.run_fifo::<LedgerTopic>(handler, opts).await
    });

    let publisher = InMemoryPublisher::new(broker.clone());
    for seq in 0..5 {
        for account in ["alice", "bob", "carol"] {
            publisher
                .publish::<LedgerTopic>(&LedgerEntry {
                    account: account.into(),
                    seq,
                    amount: 100,
                })
                .await
                .unwrap();
        }
    }

    while acked.load(Ordering::Relaxed) < 15 {
        tokio::time::sleep(Duration::from_millis(10)).await;
    }

    shutdown.cancel();
    let _ = consume_handle.await;
}