# Pulses
Pulses is a robust and flexible background job processing library for Rust. It is designed to be modular, allowing for pluggable brokers (currently supporting Redis Streams) and providing a type-safe way to define message handlers.
## Features
- **Pluggable Architecture**: Support for different message brokers. Currently ships with a Redis Streams implementation.
- **Type-Safe Handlers**: Define handlers using Rust traits with compile-time checks.
- **Concurrency Control**: Fine-grained control over worker counts and max-in-flight messages per handler.
- **Reliability**: Built-in mechanisms for message acknowledgement, automatic retries, and reclaiming pending messages from crashed consumers.
- **Graceful Shutdown**: Integrated with `tokio`'s cancellation tokens for clean shutdown sequences.
## Installation
Add `pulses` to your `Cargo.toml`. Since this is currently a library within your workspace or a local path, you might reference it like this:
```toml
[dependencies]
pulses = { version = "0.1.0" } # Or git repository
tokio = { version = "1", features = ["full"] }
```
## Usage
Here is a simple example of how to create a handler and run the application using the Redis broker.
### 1. Define a Handler
Implement the `Handler` trait for your logic. You can specify which streams to listen to and how many concurrent workers to run.
```rust
use std::sync::Arc;
use pulses::core::{Context, Envelope, Handler, Outcome, HandlerError};
use pulses::broker::redis::RedisBroker;
struct MyHandler;
impl Handler<RedisBroker> for MyHandler {
// The Redis streams this handler will consume from
const STREAMS: &'static [&'static str] = &["my-stream"];
// Number of concurrent tasks processing messages for this handler
const WORKERS: usize = 4;
async fn handle(&self, _ctx: &Context<RedisBroker>, msg: &Envelope) -> Result<Outcome, HandlerError> {
println!("Received message ID: {}, payload: {:?}", msg.id, msg.payload);
// Return Outcome::Ack to confirm successful processing
// Other options: Outcome::Retry { after_ms }, Outcome::Drop, Outcome::DeadLetter
Ok(Outcome::Ack)
}
}
```
### 2. Run the Application
Initialize the broker, configure the subscription (consumer group settings), register your handlers, and run the app.
```rust
use pulses::App;
use pulses::broker::redis::{RedisBroker, RedisSubscription};
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Connect to Redis
let broker = RedisBroker::connect("redis://127.0.0.1:6379").await?;
// 2. Configure Subscription (Consumer Group)
let subscription = RedisSubscription {
group: "my-group".into(),
consumer: "consumer-1".into(),
..Default::default()
};
// 3. Build and Run the App
let app = App::new(broker, subscription)
.register(MyHandler);
println!("Starting Pulses worker...");
// Use a CancellationToken for graceful shutdown
let token = CancellationToken::new();
// In a real application, you would trigger the token on SIGINT/SIGTERM
// e.g., tokio::signal::ctrl_c().await.unwrap(); token.cancel();
app.run(token).await?;
Ok(())
}
```
## Architecture
- **App**: The main entry point that ties the broker, subscription, and handlers together.
- **Broker**: A trait abstracting the underlying message queue (e.g., Redis, Kafka).
- **Handler**: User-defined logic for processing messages.
- **Runtime**: Handles the loop of polling, dispatching to handlers, managing retries, and reclaiming stuck messages.
## License
[License information here]