# Spark Channel
A generic event listener and message dispatcher for building event-driven architectures in Rust.
## Overview
Spark Channel is a library that enables modules to communicate with each other through an asynchronous message-passing architecture. It provides a foundation for implementing event-driven systems with support for:
1. **Request/Response Pattern**: Send messages that expect responses
2. **Command Pattern**: Fire-and-forget messages
3. **Graceful Shutdown**: Clean termination of module servers
The library is designed to be generic and can be used with any message and response types that satisfy the required trait bounds.
## Architecture
Spark Channel is built on three main components:
### 1. Cancellation Token
The `SparkChannelCancellationToken` is a wrapper around Tokio's `CancellationToken`. It implements the `SparkChannelCancellationTrait`, which provides a method to cancel the execution of a running task. Please note that `SparkChannelCancellationTrait` is a very loose trait. If you are implementing your own cancellation mechanism, this trait won't serve as a functional baseline.
### 2. Callback
The callback module provides types for handling one-shot responses:
- `CallbackSender<V>`: A type alias for Tokio's `oneshot::Sender<V>`
- `CallbackWrapper<T, V>`: A struct that wraps a message of type `T` with a sender for a response of type `V`
### 3. Listener
The listener module is the core of the library and provides:
- `SparkGenericModuleMessage`: An enum that represents different types of messages that can be sent:
- `Request`: A message that expects a response
- `Command`: A fire-and-forget message
- `Shutdown`: A message to stop the module server
- `SparkGenericModuleDispatcher`: A struct that provides methods to send messages to a module server:
- `request`: Sends a request and awaits a response
- `send_command`: Sends a command without expecting a response
- `SparkGenericModuleHandler`: A trait that defines how module servers handle messages:
- `handle_request`: Processes a request and returns a `Result<Response, Error>`
- `handle_command`: Processes a command and returns a `Result<(), Error>`
- `run_module_server`: A function that runs a module server, receiving messages and delegating them to a handler
## Message Flow
Here's how messages flow through the system:
1. **Request/Response Flow**:
- Client creates a request using the dispatcher's `request` method
- Under the hood, the dispatcher creates a `CallbackWrapper` with the message and a oneshot sender
- Server receives the message and calls the handler's `handle_request` method
- Handler processes the message and returns a `Result<Response, Error>`
- Server sends the result through the oneshot sender
- Client receives the response via the oneshot receiver unpacked by the dispatcher
2. **Command Flow**:
- Client sends a command message through the dispatcher's `send_command` method
- Server receives the message and calls the handler's `handle_command` method
- Handler processes the command and returns a `Result<(), Error>`
- Any errors are logged but not sent back to the client
3. **Shutdown Flow**:
- Client sends a `Shutdown` message with a cancellation token
- Server receives the message, cancels execution, and breaks out of its loop
- Any tasks listening for the cancellation token will be notified
## Generic Type Parameters
The library uses generics extensively to provide flexibility:
- `Message`: The type of messages sent between modules
- `Response`: The type of responses returned by request handlers
- `CancellationToken`: The type used for cancellation (must implement `SparkChannelCancellationTrait`)
- `Error`: The error type returned by handlers
## Error Handling
The library has a sophisticated error handling mechanism:
- Handlers return `Result<Response, Error>` or `Result<(), Error>` types
- The `IntoResult` trait allows for converting between different result types
- Implementations for `eyre::Result` and custom `SparkChannelError` are provided
- Error propagation happens automatically through the request/response flow
The dispatcher methods return results to handle various failure scenarios:
- Failed to send a message
- Failed to receive a response
- Invalid response type (type mismatch)
- Handler errors
## Type Downcasting
The library uses Rust's `Any` trait to support downcasting of response types. This enables handling different response types for different requests while maintaining type safety. The `request` method automatically handles downcasting the received response to the expected type.
## Example Usage
Here's a simplified example of how to use Spark Channel:
```rust
// Define message and response types
enum MyMessage {
GetData(String),
UpdateData(String, u32),
LogEvent(String),
}
enum MyResponse {
Data(Vec<u32>),
Success(bool),
}
// Implement handler
struct MyHandler {
// Handler state...
}
#[async_trait]
impl SparkGenericModuleHandler<MyMessage, MyResponse, eyre::Error> for MyHandler {
async fn handle_request(&mut self, request: MyMessage) -> Result<MyResponse, eyre::Error> {
match request {
MyMessage::GetData(key) => {
// Process request and get data...
let data = vec![1, 2, 3]; // Example data
Ok(MyResponse::Data(data))
},
MyMessage::UpdateData(key, value) => {
// Update data...
let success = true; // Example result
Ok(MyResponse::Success(success))
},
MyMessage::LogEvent(_) => {
// This shouldn't happen as LogEvent is a command
Err(eyre::eyre!("LogEvent received as request"))
},
}
}
async fn handle_command(&mut self, command: MyMessage) -> Result<(), eyre::Error> {
if let MyMessage::LogEvent(event) = command {
// Log the event...
println!("Event logged: {}", event);
Ok(())
} else {
Err(eyre::eyre!("Unexpected command type"))
}
}
}
// Create and run the server
async fn start_server() {
// Create channel
let (tx, rx) = mpsc::channel(32);
// Create handler
let handler = MyHandler { /* ... */ };
// Create dispatcher
let dispatcher = SparkGenericModuleDispatcher::new(tx);
// Spawn server
tokio::spawn(run_module_server(handler, rx));
// Return dispatcher to client code
// ...
}
// Client usage
async fn client_code(dispatcher: &SparkGenericModuleDispatcher<MyMessage, MyResponse, SparkChannelCancellationToken, eyre::Error>) {
// Send a request
let response = dispatcher
.request(MyMessage::GetData("user123".to_string()))
.await
.unwrap();
// Typically you'd match on the response type
if let MyResponse::Data(data) = response {
println!("Received data: {:?}", data);
}
// Send a command
dispatcher
.send_command::<_, eyre::Result<()>>(MyMessage::LogEvent("User logged in".to_string()))
.await
.unwrap();
// Shutdown the server
let token = SparkChannelCancellationToken::new();
dispatcher
.sender
.send(SparkGenericModuleMessage::Shutdown(token))
.await
.unwrap();
}
```
## Best Practices
1. **Message Design**:
- Use enums for message types to represent different operations
- Keep message payloads small and focused
- Consider using separate types for commands and requests
2. **Response Handling**:
- Use appropriate error types for your application
- Implement proper error handling for response downcasting failures
- Always check if the response is of the expected type
3. **Concurrency**:
- Consider using a thread pool for handling CPU-intensive request processing
- Use appropriate channel buffer sizes to handle backpressure
- Handle cancellation signals properly in long-running operations
4. **Testing**:
- Write unit tests for message handlers
- Use integration tests to verify end-to-end message flows
- Test error scenarios, including message sending failures and type mismatches
## Usage Details
### Shared State
If you need to share state between multiple handlers or between a handler and other parts of your application, consider using:
- `Arc<Mutex<T>>` for shared mutable state
- `Arc<RwLock<T>>` for read-heavy shared state
- `Arc<T>` for immutable shared state
### Multiple Handlers
For complex applications, you might want to route different message types to different handlers:
1. Create a router that maintains a mapping of message types to handlers
2. Implement a top-level handler that delegates to the appropriate sub-handler based on message type
3. Use type traits to ensure type safety across handlers
### Backpressure
To handle high message volumes:
1. Use appropriate channel buffer sizes
2. Implement rate limiting in your dispatcher
3. Consider using a bounded work queue for message processing