disruptor_rs/processor.rs
1//! Event processors that consume and handle events from the ring buffer.
2//!
3//! # Understanding Processors
4//!
5//! Processors are the consumers in the Disruptor pattern. They:
6//! 1. Read events from specific sequences in the ring buffer
7//! 2. Process events through user-defined handlers
8//! 3. Track their progress using sequence counters
9//!
10//! # Usage Examples
11//!
12//! ## Basic Single Consumer
13//! ```rust
14//! # use disruptor_rs::{
15//! # processor::EventProcessorFactory,
16//! # traits::{EventHandler, Runnable},
17//! # sequence::Sequence,
18//! # };
19//!
20//! // 1. Define your event handler
21//! struct MyEvent;
22//! struct MyHandler;
23//!
24//! impl EventHandler<MyEvent> for MyHandler {
25//! fn on_event(&self, _event: &MyEvent, sequence: Sequence, _end_of_batch: bool) {
26//! // Process the event
27//! println!("Processing event at sequence {}", sequence);
28//! }
29//!
30//! // Optional lifecycle methods
31//! fn on_start(&self) {
32//! println!("Handler started");
33//! }
34//!
35//! fn on_shutdown(&self) {}
36//! }
37//!
38//! // 2. Create the processor
39//! let handler = MyHandler;
40//! let processor = EventProcessorFactory::create(handler);
41//! ```
42//!
43//! ## Multiple Dependent Consumers
44//! ```
45//! use disruptor_rs::{
46//! processor::EventProcessorFactory,
47//! traits::{EventHandler, Runnable, DataProvider, EventProcessor, SequenceBarrier},
48//! sequence::Sequence,
49//! };
50//! use std::sync::Arc;
51//!
52//! // Define event type
53//! struct MyEvent;
54//!
55//! // Define handlers
56//! struct HandlerA;
57//! impl EventHandler<MyEvent> for HandlerA {
58//! fn on_event(&self, _: &MyEvent, _: Sequence, _: bool) {}
59//! fn on_start(&self) {}
60//! fn on_shutdown(&self) {}
61//! }
62//!
63//! struct HandlerB;
64//! impl EventHandler<MyEvent> for HandlerB {
65//! fn on_event(&self, _: &MyEvent, _: Sequence, _: bool) {}
66//! fn on_start(&self) {}
67//! fn on_shutdown(&self) {}
68//! }
69//!
70//! // Mock RingBuffer for example
71//! struct MockRingBuffer;
72//! impl DataProvider<MyEvent> for MockRingBuffer {
73//! fn get_capacity(&self) -> usize {
74//! 1024 // Example fixed capacity
75//! }
76//!
77//! unsafe fn get(&self, _: Sequence) -> &MyEvent {
78//! static EVENT: MyEvent = MyEvent;
79//! &EVENT
80//! }
81//! unsafe fn get_mut(&self, _: Sequence) -> &mut MyEvent {
82//! static mut EVENT: MyEvent = MyEvent;
83//! &mut EVENT
84//! }
85//! }
86//!
87//! // Mock Barrier for example
88//! struct MockBarrier;
89//! impl SequenceBarrier for MockBarrier {
90//! fn wait_for(&self, seq: Sequence) -> Option<Sequence> {
91//! Some(seq)
92//! }
93//! fn signal(&self) {}
94//! }
95//!
96//! // Create processors
97//! let processor_a = EventProcessorFactory::create(HandlerA);
98//! let processor_b = EventProcessorFactory::create(HandlerB);
99//!
100//! // Get processor A's sequence for processor B's barrier
101//! let seq_a = processor_a.get_sequence();
102//!
103//! // Create mock ring buffer
104//! let ring_buffer = Arc::new(MockRingBuffer);
105//!
106//! // Create barriers
107//! let barrier_a = MockBarrier;
108//! let barrier_b = MockBarrier;
109//!
110//! // Create runnables
111//! let runnable_a = processor_a.create(ring_buffer.clone(), barrier_a);
112//! let runnable_b = processor_b.create(ring_buffer.clone(), barrier_b);
113//!
114//! // Example of running processors (commented out to avoid actual thread creation in doc tests)
115//! // std::thread::spawn(move || runnable_a.run());
116//! // std::thread::spawn(move || runnable_b.run());
117//! ```
118//!
119//! # Best Practices
120//!
121//! 1. **Handler Design**:
122//! - Keep handlers stateless if possible
123//! - Minimize processing time in `on_event`
124//! - Use `end_of_batch` for batch optimizations
125//!
126//! 2. **Dependencies**:
127//! - Create clear processing chains
128//! - Avoid circular dependencies
129//! - Consider using multiple ring buffers for complex flows
130//!
131//! 3. **Performance**:
132//! - Choose appropriate wait strategies
133//! - Monitor sequence progress
134//! - Consider batch sizes in handler logic
135//!
136//! # Error Handling
137//!
138//! Handlers should manage their own error handling:
139//! ```rust
140//! # use disruptor_rs::{
141//! # traits::EventHandler,
142//! # sequence::Sequence,
143//! # };
144//!
145//! struct MyEvent;
146//! struct MyHandler;
147//!
148//! impl MyHandler {
149//! fn process_event(&self, event: &MyEvent) -> Result<(), Box<dyn std::error::Error>> {
150//! Ok(())
151//! }
152//! }
153//!
154//! impl EventHandler<MyEvent> for MyHandler {
155//! fn on_event(&self, event: &MyEvent, sequence: Sequence, _end_of_batch: bool) {
156//! match self.process_event(event) {
157//! Ok(_) => {
158//! // Normal processing
159//! }
160//! Err(e) => {
161//! // Log error but continue processing
162//! println!("Error processing event at {}: {:?}", sequence, e);
163//! }
164//! }
165//! }
166//!
167//! fn on_start(&self) {}
168//! fn on_shutdown(&self) {}
169//! }
170//! ```
171//!
172//! # Shutdown Handling
173//!
174//! Proper shutdown sequence:
175//! ```
176//! use disruptor_rs::{
177//! processor::EventProcessorFactory,
178//! traits::{EventHandler, Runnable, DataProvider, EventProcessor, SequenceBarrier},
179//! sequence::Sequence,
180//! };
181//! use std::sync::Arc;
182//!
183//! struct MyEvent;
184//! struct MyHandler;
185//!
186//! impl EventHandler<MyEvent> for MyHandler {
187//! fn on_event(&self, _: &MyEvent, _: Sequence, _: bool) {}
188//! fn on_start(&self) {}
189//! fn on_shutdown(&self) {}
190//! }
191//!
192//! struct MockBarrier;
193//! impl SequenceBarrier for MockBarrier {
194//! fn wait_for(&self, seq: Sequence) -> Option<Sequence> {
195//! Some(seq)
196//! }
197//! fn signal(&self) {}
198//! }
199//!
200//! // Mock RingBuffer for example
201//! struct MockRingBuffer;
202//! impl DataProvider<MyEvent> for MockRingBuffer {
203//! fn get_capacity(&self) -> usize {
204//! 1024 // Example fixed capacity
205//! }
206//! unsafe fn get(&self, _: Sequence) -> &MyEvent {
207//! static EVENT: MyEvent = MyEvent;
208//! &EVENT
209//! }
210//! unsafe fn get_mut(&self, _: Sequence) -> &mut MyEvent {
211//! static mut EVENT: MyEvent = MyEvent;
212//! &mut EVENT
213//! }
214//! }
215//!
216//! let barrier = MockBarrier;
217//! let ring_buffer = Arc::new(MockRingBuffer);
218//! let processor = EventProcessorFactory::create(MyHandler);
219//! let mut runnable = processor.create(ring_buffer, barrier);
220//!
221//! // Signal shutdown
222//! runnable.stop();
223//!
224//! // Wait for processing to complete
225//! while runnable.is_running() {
226//! std::thread::sleep(std::time::Duration::from_millis(1000));
227//! }
228//! ```
229//!
230//! # Mutable Event Handlers
231//!
232//! For handlers that need to maintain mutable state:
233//! ```rust
234//! use disruptor_rs::{
235//! processor::EventProcessorFactory,
236//! traits::{EventHandlerMut, Runnable},
237//! sequence::Sequence,
238//! };
239//!
240//! struct MyEvent;
241//!
242//! struct StatefulHandler {
243//! processed_count: usize,
244//! }
245//!
246//! impl EventHandlerMut<MyEvent> for StatefulHandler {
247//! fn on_event(&mut self, _event: &MyEvent, sequence: Sequence, _end_of_batch: bool) {
248//! self.processed_count += 1;
249//! println!("Processed {} events, current sequence: {}", self.processed_count, sequence);
250//! }
251//!
252//! fn on_start(&mut self) {
253//! self.processed_count = 0;
254//! println!("Starting stateful handler");
255//! }
256//!
257//! fn on_shutdown(&mut self) {
258//! println!("Shutting down after processing {} events", self.processed_count);
259//! }
260//! }
261//!
262//! // Create a mutable processor
263//! let handler = StatefulHandler { processed_count: 0 };
264//! let processor = EventProcessorFactory::create_mut(handler);
265//! ```
266
267use std::sync::{
268 atomic::{AtomicU8, Ordering},
269 Arc,
270};
271
272use crate::{
273 sequence::AtomicSequence,
274 traits::{DataProvider, EventHandler, EventProcessor, Runnable, SequenceBarrier},
275 EventHandlerMut, EventProcessorMut,
276};
277
278pub struct EventProcessorFactory;
279
280impl EventProcessorFactory {
281 pub fn create<'a, E, T>(handler: E) -> impl EventProcessor<'a, T>
282 where
283 E: EventHandler<T> + Send + 'a,
284 T: Send + 'a,
285 {
286 Processor {
287 handler,
288 cursor: Default::default(),
289 _marker: Default::default(),
290 }
291 }
292
293 pub fn create_mut<'a, E, T>(handler: E) -> impl EventProcessorMut<'a, T>
294 where
295 E: EventHandlerMut<T> + Send + 'a,
296 T: Send + 'a,
297 {
298 ProcessorMut {
299 handler,
300 cursor: Default::default(),
301 _marker: Default::default(),
302 }
303 }
304}
305
306struct Processor<E, T> {
307 handler: E,
308 cursor: Arc<AtomicSequence>,
309 _marker: std::marker::PhantomData<T>,
310}
311
312struct ProcessorMut<E, T> {
313 handler: E,
314 cursor: Arc<AtomicSequence>,
315 _marker: std::marker::PhantomData<T>,
316}
317
318enum RunnableProcessorState {
319 Idle = 0,
320 Halted = 1,
321 Running = 2,
322}
323
324struct RunnableProcessor<E, T, D: DataProvider<T>, B: SequenceBarrier> {
325 running: AtomicU8,
326 processor: Processor<E, T>,
327 data_provider: Arc<D>,
328 barrier: B,
329}
330
331struct RunnableProcessorMut<E, T, D: DataProvider<T>, B: SequenceBarrier> {
332 running: AtomicU8,
333 processor: ProcessorMut<E, T>,
334 data_provider: Arc<D>,
335 barrier: B,
336}
337
338impl<E, T, D, B> RunnableProcessor<E, T, D, B>
339where
340 E: EventHandler<T> + Send,
341 D: DataProvider<T>,
342 B: SequenceBarrier,
343 T: Send,
344{
345 fn process_events(&self) {
346 let f = &self.processor.handler;
347 let cursor = &self.processor.cursor;
348 let data_provider = &self.data_provider;
349 let barrier = &self.barrier;
350
351 while self.running.load(Ordering::Acquire) == RunnableProcessorState::Running as u8 {
352 let next_sequence = cursor.get() + 1;
353 let available_sequence = barrier.wait_for(next_sequence);
354
355 match available_sequence {
356 Some(available_sequence) => {
357 for i in next_sequence..=available_sequence {
358 let event = unsafe { data_provider.get(i) };
359 f.on_event(event, i, i == available_sequence);
360 }
361
362 cursor.set(available_sequence);
363 barrier.signal();
364 }
365 None => {
366 return;
367 }
368 }
369 }
370 }
371}
372
373impl<'a, E, T, D, B> RunnableProcessorMut<E, T, D, B>
374where
375 E: EventHandlerMut<T> + Send + 'a,
376 D: DataProvider<T>,
377 B: SequenceBarrier,
378 T: Send + 'a,
379{
380 fn process_events(&mut self) {
381 let f = &mut self.processor.handler;
382 let cursor = &self.processor.cursor;
383 let data_provider = &self.data_provider;
384 let barrier = &self.barrier;
385
386 while self.running.load(Ordering::Acquire) == RunnableProcessorState::Running as u8 {
387 let next_sequence = cursor.get() + 1;
388 let available_sequence = barrier.wait_for(next_sequence);
389
390 match available_sequence {
391 Some(available_sequence) => {
392 for i in next_sequence..=available_sequence {
393 let event = unsafe { data_provider.get(i) };
394 f.on_event(event, i, i == available_sequence);
395 }
396
397 cursor.set(available_sequence);
398 barrier.signal();
399 }
400 None => {
401 return;
402 }
403 }
404 }
405 }
406}
407
408impl<'a, E, T> EventProcessor<'a, T> for Processor<E, T>
409where
410 E: EventHandler<T> + Send + 'a,
411 T: Send + 'a,
412{
413 fn get_cursor(&self) -> Arc<AtomicSequence> {
414 self.cursor.clone()
415 }
416
417 fn create<D: DataProvider<T> + 'a, S: SequenceBarrier + 'a>(
418 self,
419 data_provider: Arc<D>,
420 barrier: S,
421 ) -> Box<dyn Runnable + 'a> {
422 Box::new(RunnableProcessor {
423 running: AtomicU8::new(RunnableProcessorState::Idle as u8),
424 processor: self,
425 data_provider,
426 barrier,
427 })
428 }
429
430 fn get_sequence(&self) -> Arc<AtomicSequence> {
431 self.cursor.clone()
432 }
433}
434
435impl<'a, E, T> EventProcessorMut<'a, T> for ProcessorMut<E, T>
436where
437 E: EventHandlerMut<T> + Send + 'a,
438 T: Send + 'a,
439{
440 fn get_cursor(&self) -> Arc<AtomicSequence> {
441 self.cursor.clone()
442 }
443
444 fn create<D: DataProvider<T> + 'a, S: SequenceBarrier + 'a>(
445 self,
446 data_provider: Arc<D>,
447 barrier: S,
448 ) -> Box<dyn Runnable + 'a> {
449 Box::new(RunnableProcessorMut {
450 running: AtomicU8::new(RunnableProcessorState::Idle as u8),
451 processor: self,
452 data_provider,
453 barrier,
454 })
455 }
456
457 fn get_sequence(&self) -> Arc<AtomicSequence> {
458 self.cursor.clone()
459 }
460}
461
462impl<E, T, D, B> Runnable for RunnableProcessor<E, T, D, B>
463where
464 E: EventHandler<T> + Send,
465 D: DataProvider<T>,
466 B: SequenceBarrier,
467 T: Send,
468{
469 fn run(&mut self) {
470 self.running.store(
471 RunnableProcessorState::Running as u8,
472 std::sync::atomic::Ordering::Release,
473 );
474 self.processor.handler.on_start();
475 self.process_events();
476 self.running.store(
477 RunnableProcessorState::Idle as u8,
478 std::sync::atomic::Ordering::Release,
479 );
480 self.processor.handler.on_shutdown();
481 }
482
483 fn stop(&mut self) {
484 self.running.store(
485 RunnableProcessorState::Halted as u8,
486 std::sync::atomic::Ordering::Release,
487 );
488 }
489
490 fn is_running(&self) -> bool {
491 self.running.load(std::sync::atomic::Ordering::Acquire)
492 == RunnableProcessorState::Running as u8
493 }
494}
495
496impl<E, T, D, B> Runnable for RunnableProcessorMut<E, T, D, B>
497where
498 E: EventHandlerMut<T> + Send,
499 D: DataProvider<T>,
500 B: SequenceBarrier,
501 T: Send,
502{
503 fn run(&mut self) {
504 self.running.store(
505 RunnableProcessorState::Running as u8,
506 std::sync::atomic::Ordering::Release,
507 );
508 self.processor.handler.on_start();
509 self.process_events();
510 self.running.store(
511 RunnableProcessorState::Idle as u8,
512 std::sync::atomic::Ordering::Release,
513 );
514 self.processor.handler.on_shutdown();
515 }
516
517 fn stop(&mut self) {
518 self.running.store(
519 RunnableProcessorState::Halted as u8,
520 std::sync::atomic::Ordering::Release,
521 );
522 }
523
524 fn is_running(&self) -> bool {
525 self.running.load(std::sync::atomic::Ordering::Acquire)
526 == RunnableProcessorState::Running as u8
527 }
528}