# Ollie
Ollie is a Rust crate built on top of Lapin to simplify RabbitMQ interactions. It provides a high-level, HTTP-like API experience for handling message queues. By defining routes (queue names) and corresponding asynchronous handler functions, RabbitRouter makes it easier to process messages while maintaining flexibility.
## Features
- Route-based API: Define routes to route data from queues or exchanges to handler functions.
- Shared state management: Easily pass shared application state to your handlers.
- Exchange support: Bind queues to exchanges with specific routing keys.
- Async handler support: Write asynchronous handlers for processing messages.
- Built-in error handling: Acknowledge messages and optionally publish results to exchanges.
## Installation
Add ollie to your `cargo.toml`
```
cargo add ollie
```
## Getting Started
### 1. Creating a RabbitRouter
Start by creating a new instance of RabbitRouter with a RabbitMQ connection URI and an initial state:
``` rust
use ollie::RabbitRouter;
use std::collections::HashMap;
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let initial_state = HashMap::new(); // Shared state for handlers
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
}
```
### 2. Connect to a queue or exchange
You can add routes for specific queues or exchanges. Each route requires a handler function that defines how messages should be processed.
#### Connecting directly to a queue
You can connect directly to a queue to receive data from that queue. Since it's a RabbitMq queue only one consumer will handle the message. You might use this to attach many workers to the same queue.
```rust
use ollie::RabbitResult;
use std::collections::HashMap;
use tokio::sync::Mutex;
use std::sync::Arc;
async fn my_handler(
data: Vec<u8>,
state: Arc<Mutex<HashMap<String, i32>>>, //note that this is the type of the state defined in main but wrapped in Arc<Mutex<>>
) -> Option<RabbitResult> {
let mut state = state.lock().await;
let count = state.entry("processed_count".to_string()).or_insert(0);
*count += 1;
println!("Message: {:?}", String::from_utf8_lossy(&data));
None
}
#[tokio::main]
async fn main() {
let initial_state = HashMap::new();
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
router
.add_route_queue("test_queue", None, my_handler)
.await
.expect("Failed to add route");
}
```
### Connecting to an exchange
```rust
use ollie::{RabbitRouter, Exchange};
#[tokio::main]
async fn main() {
let initial_state = HashMap::new();
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
router
.add_route_exchange("test_exchange", "test.routing.key", None, my_handler)
.await
.expect("Failed to add route to exchange");
}
```
## Advanced Features
### Shared State
RabbitRouter supports shared state between routes, enabling coordinated processing. The router automatically imposes `Arc<Mutex<S>>` around the state type for safe shared state access.
### Result Publishing
If you need to publish results after processing, specify a result_exchange when adding a route. The handler can return a RabbitResult, which is serialized and published to the specified exchange.
How the Results Exchange Works
When you specify a result_exchange, RabbitRouter will automatically publish the result returned by the handler to the specified exchange. A result_exchange has the following components:
Exchange Name: The name of the RabbitMQ exchange where results are published.
Routing Key: A key used to route the result to the appropriate queue.
The handler function can return a RabbitResult containing the data you want to publish. RabbitRouter will serialize this result (e.g., using JSON) and publish it to the result_exchange.
The Router will automatrically set the first two topics in the routing key based on the RabbitResult. The first part of a routing key is the logging level and the second is the billing type.
Example:
```rust
use ollie::{RabbitRouter, RabbitResult, Exchange};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
async fn handler_with_result(
data: Vec<u8>,
state: Arc<Mutex<HashMap<String, i32>>>,
) -> Option<RabbitResult> {
let message = String::from_utf8_lossy(&data);
println!("Processed message: {}", message);
// Return a RabbitResult to be published to the result exchange
Some(RabbitResult {
logging_level: ollie::rabbit_result::LoggingLevel::Info,
billing_type: ollie::rabbit_result::BillingType::NotBilled,
task_result: TaskResult {
task_id: id,
engine_metadata: EngineMetadata {
engine_id: "Filter Engine".to_string(),
time_taken: elapsed,
}
data: json!({"your_data":"here"})
}
})
}
#[tokio::main]
async fn main() {
let state = HashMap::new();
let initial_state = Arc::new(Mutex::new(state));
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state.clone()).await;
let result_exchange = Some(Exchange {
name: "result_exchange".to_string(),
routing_key: "result.key".to_string(),
});
router
.add_route_queue("my_queue", result_exchange, handler_with_result)
.await
.expect("Failed to add route");
}
```
### Error Handling
RabbitRouter automatically acknowledges messages. Customize your handlers to handle errors and return appropriate results.
## Examples
### Multiple Queues Example
```rust
#[tokio::main]
async fn main() {
let state = HashMap::new();
let initial_state = Arc::new(Mutex::new(state));
let router = RabbitRouter::new("amqp://127.0.0.1:5672", initial_state).await;
router
.add_route_queue("queue_1", None, my_handler)
.await
.expect("Failed to add route");
router
.add_route_queue("queue_2", None, my_handler)
.await
.expect("Failed to add route");
}
```
## Testing
Run the included tests to verify functionality:
`cargo test`
## License
This project is licensed under the MIT or Apache 2.0 License
## Contributing
Contributions are welcome! Feel free to open issues or submit pull requests on GitHub.
## Feedback
If you have any questions or feedback, please open an issue on GitHub or contact the maintainers.