bunbun-worker 0.2.1

An rpc/non-rpc rabbitmq worker library
Documentation
<center> <h1> BunBun-Worker </h1> </center>

> ❗ Disclaimer  
> This crate is still under development, meaning api's may change on every commit...

# Introduction

`bunbun-worker` was made to provide a _panic-safe_ multithreaded job-runner server & client for microservices and alike. It supports [RPC](https://wikipedia.org/wiki/Remote_procedure_call) and regular (non-rpc) calls. As of right now only [rabbitmq](https://www.rabbitmq.com/) is supported but [gRPC](https://grpc.io/) will be added too.

### Rpc

Remote procedure call, as it's name says is a message that can be sent to a remote microservice to be processed and the result to be returned. In `bunbun-worker` it's implemented by the following example:

```mermaid
sequenceDiagram
    ServiceA->>+ServiceB: Hey, could you do this job for me?
    Note right of ServiceB: ServiceB does the job
    ServiceB->>+ServiceA: Sure, here is the data result
```

1. ServiceA creates a callback queue that the response shall be sent to.
2. ServiceA sends a json job message to an **already declared** queue on a rabbitmq server.
3. ServiceB is listening on that queue for messages and spawns a new thread for each received.
4. Once ServiceB has finished the work, using the received messages header it responds to the callback queue with the correlation-id.

### Non-rpc

In `bunbun-worker` regular jobs are called _non-rpc_ jobs, indicating that the response is not awaited.

## Installation

Get directly from crates.io

```toml
[dependencies]
bunbun-worker = { version = "0.2.0" }
```

or get it directly from source

```toml
[dependencies]
bunbun-worker = { git = "https://git.4o1x5.dev/4o1x5/bunbun-worker", branch = "master" }
```

## Usage

This example uses [DTO](https://en.wikipedia.org/wiki/Data_transfer_object) as a way to transfer data between services.  
Add `futures` via `cargo add futures`

```rust
// server.rs
#[derive(Clone, Debug)]
pub struct State {
    pub something: String,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct EmailJob {
    send_to: String,
    contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct EmailJobResult {
    contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub enum EmailJobResultError {
    Errored,
}
impl RPCTask for EmailJob {
        type ErroredResult = EmailJobResultError;
        type Result = EmailJobResult;
        type State = State;
        fn run(
            self,
            state: Arc<Self::State>,
        ) -> futures::prelude::future::BoxFuture<'static, Result<Self::Result, Self::ErroredResult>>
        {
            async move {
                tracing::info!("Sent email to {}", self.send_to);
                tokio::time::sleep(Duration::from_secs(2)).await;
                return Ok(EmailJobResult {
                    contents: self.contents.clone(),
                });
            }
            .boxed()
        }
}

#[tokio::main]
async fn main() {
    let mut listener = Worker::new(
        env::var("AMQP_SERVER_URL").unwrap(),
        WorkerConfig::default(),
    )
    .await;
    listener
        .add_rpc_consumer::<EmailJob>(
            Arc::new(State {
                something: "test".into(),
            }),
            ListenerConfig::default("emailjob").set_prefetch_count(100),
        );
    listener.start_all_listeners().await;
}
```

```rust
// client.rs
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct EmailJob {
    send_to: String,
    contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub struct EmailJobResult {
    contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub enum EmailJobResultError {
    Errored,
}

// Implement the client side trait, so the caller knows what the return types are
impl RPCClientTask for EmailJob {
    type ErroredResult = EmailJobResultError;
    type Result = EmailJobResult;
}


#[tokio::main]
async fn main() {
 let client = Client::new(env::var("AMQP_SERVER_URL").unwrap().as_str())
            .await
            .unwrap();
        let result = client
            .rpc_call::<EmailJob>(
                EmailJob {
                    send_to: "someone".into(),
                    contents: "something".into(),
                },
                BasicCallOptions::default("emailjob").timeout(Duration::from_secs(3)),
            )
            .await
            .unwrap();
        println!("{:?}",result);
}
```

### Message versioning

In this crate message versioning is done by including `v1.0.0` or such on the end of the queue name, instead of including it in the headers of a message. This reduces the amount of redelivered messages.  
The following example will send a job to a queue named `emailjob-v1.0.0`.

```rust
let result = client
        .rpc_call::<EmailJob>(
            EmailJob {
                send_to: "someone".into(),
                contents: "something".into(),
            },
            BasicCallOptions::default("emailjob")
                .timeout(Duration::from_secs(3))
                .message_version("v1.0.0")
        )
        .await
        .unwrap();
```

# Limitations

1. Currently some `unwrap()`'s are called inside the code and may results in panics (not in the job-runner).
2. limited API

# Bugs department

Since the code is hosted on a private git instance (as of right now) any bugs shall be discussed in [4o1x5's project room](https://matrix.to/#/#projects:4o1x5.dev).

## License

Licensed under [GNU AFFERO GENERAL PUBLIC LICENSE](https://www.gnu.org/licenses/agpl-3.0.en.html)

### Contribution

Currently this library does not accept any contributors, as it's hosted on a private git server.

# Credits

This package was made with the help of-

- [This readme template you are reading right now]https://github.com/webern/cargo-readme/blob/master/README.md
- [Lapin, an extensive easy to use rabbitmq client]https://crates.io/crates/lapin