Expand description
Event processors that consume and handle events from the ring buffer.
§Understanding Processors
Processors are the consumers in the Disruptor pattern. They:
- Read events from specific sequences in the ring buffer
- Process events through user-defined handlers
- Track their progress using sequence counters
§Usage Examples
§Basic Single Consumer
// 1. Define your event handler
struct MyEvent;
struct MyHandler;
impl EventHandler<MyEvent> for MyHandler {
fn on_event(&self, _event: &MyEvent, sequence: Sequence, _end_of_batch: bool) {
// Process the event
println!("Processing event at sequence {}", sequence);
}
// Optional lifecycle methods
fn on_start(&self) {
println!("Handler started");
}
fn on_shutdown(&self) {}
}
// 2. Create the processor
let handler = MyHandler;
let processor = EventProcessorFactory::create(handler);§Multiple Dependent Consumers
use disruptor_rs::{
processor::EventProcessorFactory,
traits::{EventHandler, Runnable, DataProvider, EventProcessor, SequenceBarrier},
sequence::Sequence,
};
use std::sync::Arc;
// Define event type
struct MyEvent;
// Define handlers
struct HandlerA;
impl EventHandler<MyEvent> for HandlerA {
fn on_event(&self, _: &MyEvent, _: Sequence, _: bool) {}
fn on_start(&self) {}
fn on_shutdown(&self) {}
}
struct HandlerB;
impl EventHandler<MyEvent> for HandlerB {
fn on_event(&self, _: &MyEvent, _: Sequence, _: bool) {}
fn on_start(&self) {}
fn on_shutdown(&self) {}
}
// Mock RingBuffer for example
struct MockRingBuffer;
impl DataProvider<MyEvent> for MockRingBuffer {
fn get_capacity(&self) -> usize {
1024 // Example fixed capacity
}
unsafe fn get(&self, _: Sequence) -> &MyEvent {
static EVENT: MyEvent = MyEvent;
&EVENT
}
unsafe fn get_mut(&self, _: Sequence) -> &mut MyEvent {
static mut EVENT: MyEvent = MyEvent;
&mut EVENT
}
}
// Mock Barrier for example
struct MockBarrier;
impl SequenceBarrier for MockBarrier {
fn wait_for(&self, seq: Sequence) -> Option<Sequence> {
Some(seq)
}
fn signal(&self) {}
}
// Create processors
let processor_a = EventProcessorFactory::create(HandlerA);
let processor_b = EventProcessorFactory::create(HandlerB);
// Get processor A's sequence for processor B's barrier
let seq_a = processor_a.get_sequence();
// Create mock ring buffer
let ring_buffer = Arc::new(MockRingBuffer);
// Create barriers
let barrier_a = MockBarrier;
let barrier_b = MockBarrier;
// Create runnables
let runnable_a = processor_a.create(ring_buffer.clone(), barrier_a);
let runnable_b = processor_b.create(ring_buffer.clone(), barrier_b);
// Example of running processors (commented out to avoid actual thread creation in doc tests)
// std::thread::spawn(move || runnable_a.run());
// std::thread::spawn(move || runnable_b.run());§Best Practices
-
Handler Design:
- Keep handlers stateless if possible
- Minimize processing time in
on_event - Use
end_of_batchfor batch optimizations
-
Dependencies:
- Create clear processing chains
- Avoid circular dependencies
- Consider using multiple ring buffers for complex flows
-
Performance:
- Choose appropriate wait strategies
- Monitor sequence progress
- Consider batch sizes in handler logic
§Error Handling
Handlers should manage their own error handling:
struct MyEvent;
struct MyHandler;
impl MyHandler {
fn process_event(&self, event: &MyEvent) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
}
impl EventHandler<MyEvent> for MyHandler {
fn on_event(&self, event: &MyEvent, sequence: Sequence, _end_of_batch: bool) {
match self.process_event(event) {
Ok(_) => {
// Normal processing
}
Err(e) => {
// Log error but continue processing
println!("Error processing event at {}: {:?}", sequence, e);
}
}
}
fn on_start(&self) {}
fn on_shutdown(&self) {}
}§Shutdown Handling
Proper shutdown sequence:
use disruptor_rs::{
processor::EventProcessorFactory,
traits::{EventHandler, Runnable, DataProvider, EventProcessor, SequenceBarrier},
sequence::Sequence,
};
use std::sync::Arc;
struct MyEvent;
struct MyHandler;
impl EventHandler<MyEvent> for MyHandler {
fn on_event(&self, _: &MyEvent, _: Sequence, _: bool) {}
fn on_start(&self) {}
fn on_shutdown(&self) {}
}
struct MockBarrier;
impl SequenceBarrier for MockBarrier {
fn wait_for(&self, seq: Sequence) -> Option<Sequence> {
Some(seq)
}
fn signal(&self) {}
}
// Mock RingBuffer for example
struct MockRingBuffer;
impl DataProvider<MyEvent> for MockRingBuffer {
fn get_capacity(&self) -> usize {
1024 // Example fixed capacity
}
unsafe fn get(&self, _: Sequence) -> &MyEvent {
static EVENT: MyEvent = MyEvent;
&EVENT
}
unsafe fn get_mut(&self, _: Sequence) -> &mut MyEvent {
static mut EVENT: MyEvent = MyEvent;
&mut EVENT
}
}
let barrier = MockBarrier;
let ring_buffer = Arc::new(MockRingBuffer);
let processor = EventProcessorFactory::create(MyHandler);
let mut runnable = processor.create(ring_buffer, barrier);
// Signal shutdown
runnable.stop();
// Wait for processing to complete
while runnable.is_running() {
std::thread::sleep(std::time::Duration::from_millis(1000));
}§Mutable Event Handlers
For handlers that need to maintain mutable state:
use disruptor_rs::{
processor::EventProcessorFactory,
traits::{EventHandlerMut, Runnable},
sequence::Sequence,
};
struct MyEvent;
struct StatefulHandler {
processed_count: usize,
}
impl EventHandlerMut<MyEvent> for StatefulHandler {
fn on_event(&mut self, _event: &MyEvent, sequence: Sequence, _end_of_batch: bool) {
self.processed_count += 1;
println!("Processed {} events, current sequence: {}", self.processed_count, sequence);
}
fn on_start(&mut self) {
self.processed_count = 0;
println!("Starting stateful handler");
}
fn on_shutdown(&mut self) {
println!("Shutting down after processing {} events", self.processed_count);
}
}
// Create a mutable processor
let handler = StatefulHandler { processed_count: 0 };
let processor = EventProcessorFactory::create_mut(handler);