amqpsy 0.1.0

Extremely opinionated AMQP PubSub library
Documentation
<h1 align="center">amqpsy</h1>
<div align="center">
 <strong>
     Extremely opinionated AMQP PubSub library
 </strong>
</div>

<br />

<div align="center">
  <!-- Crates version -->
  <a href="https://crates.io/crates/amqpsy">
    <img src="https://img.shields.io/crates/v/amqpsy.svg?style=flat-square"
    alt="Crates.io version" />
  </a>
  <!-- Downloads -->
  <a href="https://crates.io/crates/amqpsy">
    <img src="https://img.shields.io/crates/d/amqpsy.svg?style=flat-square"
      alt="Download" />
  </a>
  <!-- docs.rs docs -->
  <a href="https://docs.rs/amqpsy">
    <img src="https://img.shields.io/badge/docs-latest-blue.svg?style=flat-square"
      alt="docs.rs docs" />
  </a>
</div>
<br/>

Built on top of `lapin` - this library is designed to be a simple and easy to use AMQP PubSub library.
It's extremely opinionated and comes with some batteries* included.

## Features
- Publishers with default configurations for Events and Commands.
- Consumers have DLQ enabled by default.
- Fluent interface for consumer groups.
- Ergonomic error handling API: retry, DLQ, or invalid message options.
- Enforces Protobuf for messages, ensuring backward compatibility.
- Distributed tracing enabled with Otel: linked traces from publisher to consumer.
- Chaos Monkey: simulate duplicate messages, publish failure etc.

## Usage

### Consumer Groups
```rust,ignore
    let context = AppContext::new(); // your app context
    let config = amqpsy::AmqpConfig {
        connection_string: "<your connection string>".to_string(),
    };

    #[rustfmt::skip]
    ConsumerGroup::builder(
        "app_name",
        config,
        context,
    )
    .for_topic_exchange("ledger")
        // Using default settings
        .consume(handlers::CreateOutboundHandler).await?
        // Using custom settings
        .consume_with_config(handlers::OutboundAcceptedHandler, ConsumerConfig::default().with_worker_count(5)).await?
    .then()
    .for_topic_exchange("provider")
        .consume(handlers::provider::InboundTransactionSettledHandler).await?
        .consume(handlers::provider::OutboundAcceptedHandler).await?
        .consume(handlers::provider::OutboundSettledHandler).await?
        .consume(handlers::provider::OutboundFailedHandler).await?
    .run_until_shutdown().await?; // This will block until shutdown

    // Example handler
    #[derive(Clone, Debug)]
    pub struct CreateOutboundHandler;

    impl AmqpHandler for CreateOutboundHandler {
        type Message = CreateOutboundTransaction; // Must be a Protobuffer Message
        type Context = AppContext;

        async fn handle(
            &self,
            context: Self::Context,
            message: Self::Message,
        ) -> Result<(), ConsumerError> {
            // Do something with the message

            // Or return error to nack the message
            // return Err(ConsumerError::Retry(...) // means the message retried before sent to DLQ
            // return Err(ConsumerError::Fatal(..)) // means the message sent to DLQ
            // return Err(ConsumerError::Invalid(..)) // means the message is dropped
            //
            // Also use the fluent API to handle errors
            //
            // context
            //      .db.add_transaction(&message).await
            //      .or_transient_error()?; // Other Options: or_fatal_error()? or_invalid_error()?

            Ok(()) // means the message is Ack'd
        }
    }

```

## Publishers

Pick one of three option using the `AmqpPublisher::new_*` methods.
* Command: Publisher confirm and mandatory routing enabled
* Event: Publisher confirm enabled but no mandatory routing enabled
* Event without Publisher confirmation: No publisher confirm and no mandatory routing

```rust,ignore
// commands::CreateOutboundTransaction is a Proto message
let command =
    AmqpPublisher::<commands::CreateOutboundTransaction>::new_for_command(
        config,
        "exchange_name",
    )
    .await?;
let event = AmqpPublisher::<events::OutboundTransactionSettled>::new_for_event(
    config,
    "exchange_name",
)
.await?; // or new_for_event_without_publisher_confirmation

// Publish a command
// Command has publisher confirm and mandatory routing enabled
command.publish(&command).await?;

// Publish an event
// Event has publisher confirm enabled but no mandatory routing enabled
event.publish(&event).await?;
```


## Distributed Tracing

Traces are linked between publisher and consumer with necessary tags set on them (prefixed with `amqpsy.*`)

![example 1](assets/tracing_example_1.png)

![example 2](assets/tracing_example_2.png)

## Chaos

Simulate duplicate mesages, publish failure etc. in your system.

Example shows a system with 10% duplicate messages and 10% publish failure.
![chaos example 2](assets/chaos_1.png)

* Enable feature `chaos`
* Specify one of the chaos parameteres as environment variables.
  * `AMQPSY_CHAOS_PUBLISHER_DUPLICATE_PERCENTAGE`: % of publish will be sent more than once to the broker.
  * `AMQPSY_CHAOS_PUBLISHER_FAILURE_PERCENTAGE`: % of publish will fail due to random error.

If you enable `chaos` feature but don't specify any parameter - the application will crash at startup.