Skip to main content

disruptor_rs/
processor.rs

1//! Event processors that consume and handle events from the ring buffer.
2//!
3//! # Understanding Processors
4//!
5//! Processors are the consumers in the Disruptor pattern. They:
6//! 1. Read events from specific sequences in the ring buffer
7//! 2. Process events through user-defined handlers
8//! 3. Track their progress using sequence counters
9//!
10//! # Usage Examples
11//!
12//! ## Basic Single Consumer
13//! ```rust
14//! # use disruptor_rs::{
15//! #     processor::EventProcessorFactory,
16//! #     traits::{EventHandler, Runnable},
17//! #     sequence::Sequence,
18//! # };
19//!
20//! // 1. Define your event handler
21//! struct MyEvent;
22//! struct MyHandler;
23//!
24//! impl EventHandler<MyEvent> for MyHandler {
25//!     fn on_event(&self, _event: &MyEvent, sequence: Sequence, _end_of_batch: bool) {
26//!         // Process the event
27//!         println!("Processing event at sequence {}", sequence);
28//!     }
29//!
30//!     // Optional lifecycle methods
31//!     fn on_start(&self) {
32//!         println!("Handler started");
33//!     }
34//!
35//!     fn on_shutdown(&self) {}
36//! }
37//!
38//! // 2. Create the processor
39//! let handler = MyHandler;
40//! let processor = EventProcessorFactory::create(handler);
41//! ```
42//!
43//! ## Multiple Dependent Consumers
44//! ```
45//! use disruptor_rs::{
46//!     processor::EventProcessorFactory,
47//!     traits::{EventHandler, Runnable, DataProvider, EventProcessor, SequenceBarrier},
48//!     sequence::Sequence,
49//! };
50//! use std::sync::Arc;
51//!
52//! // Define event type
53//! struct MyEvent;
54//!
55//! // Define handlers
56//! struct HandlerA;
57//! impl EventHandler<MyEvent> for HandlerA {
58//!     fn on_event(&self, _: &MyEvent, _: Sequence, _: bool) {}
59//!     fn on_start(&self) {}
60//!     fn on_shutdown(&self) {}
61//! }
62//!
63//! struct HandlerB;
64//! impl EventHandler<MyEvent> for HandlerB {
65//!     fn on_event(&self, _: &MyEvent, _: Sequence, _: bool) {}
66//!     fn on_start(&self) {}
67//!     fn on_shutdown(&self) {}
68//! }
69//!
70//! // Mock RingBuffer for example
71//! struct MockRingBuffer;
72//! impl DataProvider<MyEvent> for MockRingBuffer {
73//!     fn get_capacity(&self) -> usize {
74//!         1024 // Example fixed capacity
75//!     }
76//!
77//!     unsafe fn get(&self, _: Sequence) -> &MyEvent {
78//!         static EVENT: MyEvent = MyEvent;
79//!         &EVENT
80//!     }
81//!     unsafe fn get_mut(&self, _: Sequence) -> &mut MyEvent {
82//!         static mut EVENT: MyEvent = MyEvent;
83//!         &mut EVENT
84//!     }
85//! }
86//!
87//! // Mock Barrier for example
88//! struct MockBarrier;
89//! impl SequenceBarrier for MockBarrier {
90//!     fn wait_for(&self, seq: Sequence) -> Option<Sequence> {
91//!         Some(seq)
92//!     }
93//!     fn signal(&self) {}
94//! }
95//!
96//! // Create processors
97//! let processor_a = EventProcessorFactory::create(HandlerA);
98//! let processor_b = EventProcessorFactory::create(HandlerB);
99//!
100//! // Get processor A's sequence for processor B's barrier
101//! let seq_a = processor_a.get_sequence();
102//!
103//! // Create mock ring buffer
104//! let ring_buffer = Arc::new(MockRingBuffer);
105//!
106//! // Create barriers
107//! let barrier_a = MockBarrier;
108//! let barrier_b = MockBarrier;
109//!
110//! // Create runnables
111//! let runnable_a = processor_a.create(ring_buffer.clone(), barrier_a);
112//! let runnable_b = processor_b.create(ring_buffer.clone(), barrier_b);
113//!
114//! // Example of running processors (commented out to avoid actual thread creation in doc tests)
115//! // std::thread::spawn(move || runnable_a.run());
116//! // std::thread::spawn(move || runnable_b.run());
117//! ```
118//!
119//! # Best Practices
120//!
121//! 1. **Handler Design**:
122//!    - Keep handlers stateless if possible
123//!    - Minimize processing time in `on_event`
124//!    - Use `end_of_batch` for batch optimizations
125//!
126//! 2. **Dependencies**:
127//!    - Create clear processing chains
128//!    - Avoid circular dependencies
129//!    - Consider using multiple ring buffers for complex flows
130//!
131//! 3. **Performance**:
132//!    - Choose appropriate wait strategies
133//!    - Monitor sequence progress
134//!    - Consider batch sizes in handler logic
135//!
136//! # Error Handling
137//!
138//! Handlers should manage their own error handling:
139//! ```rust
140//! # use disruptor_rs::{
141//! #     traits::EventHandler,
142//! #     sequence::Sequence,
143//! # };
144//!
145//! struct MyEvent;
146//! struct MyHandler;
147//!
148//! impl MyHandler {
149//!     fn process_event(&self, event: &MyEvent) -> Result<(), Box<dyn std::error::Error>> {
150//!         Ok(())
151//!     }
152//! }
153//!
154//! impl EventHandler<MyEvent> for MyHandler {
155//!     fn on_event(&self, event: &MyEvent, sequence: Sequence, _end_of_batch: bool) {
156//!         match self.process_event(event) {
157//!             Ok(_) => {
158//!                 // Normal processing
159//!             }
160//!             Err(e) => {
161//!                 // Log error but continue processing
162//!                 println!("Error processing event at {}: {:?}", sequence, e);
163//!             }
164//!         }
165//!     }
166//!
167//!     fn on_start(&self) {}
168//!     fn on_shutdown(&self) {}
169//! }
170//! ```
171//!
172//! # Shutdown Handling
173//!
174//! Proper shutdown sequence:
175//! ```
176//! use disruptor_rs::{
177//!     processor::EventProcessorFactory,
178//!     traits::{EventHandler, Runnable, DataProvider, EventProcessor, SequenceBarrier},
179//!     sequence::Sequence,
180//! };
181//! use std::sync::Arc;
182//!
183//! struct MyEvent;
184//! struct MyHandler;
185//!
186//! impl EventHandler<MyEvent> for MyHandler {
187//!     fn on_event(&self, _: &MyEvent, _: Sequence, _: bool) {}
188//!     fn on_start(&self) {}
189//!     fn on_shutdown(&self) {}
190//! }
191//!
192//! struct MockBarrier;
193//! impl SequenceBarrier for MockBarrier {
194//!     fn wait_for(&self, seq: Sequence) -> Option<Sequence> {
195//!         Some(seq)
196//!     }
197//!     fn signal(&self) {}
198//! }
199//!
200//! // Mock RingBuffer for example
201//! struct MockRingBuffer;
202//! impl DataProvider<MyEvent> for MockRingBuffer {
203//!     fn get_capacity(&self) -> usize {
204//!         1024 // Example fixed capacity
205//!     }
206//!     unsafe fn get(&self, _: Sequence) -> &MyEvent {
207//!         static EVENT: MyEvent = MyEvent;
208//!         &EVENT
209//!     }
210//!     unsafe fn get_mut(&self, _: Sequence) -> &mut MyEvent {
211//!         static mut EVENT: MyEvent = MyEvent;
212//!         &mut EVENT
213//!     }
214//! }
215//!
216//! let barrier = MockBarrier;
217//! let ring_buffer = Arc::new(MockRingBuffer);
218//! let processor = EventProcessorFactory::create(MyHandler);
219//! let mut runnable = processor.create(ring_buffer, barrier);
220//!
221//! // Signal shutdown
222//! runnable.stop();
223//!
224//! // Wait for processing to complete
225//! while runnable.is_running() {
226//!     std::thread::sleep(std::time::Duration::from_millis(1000));
227//! }
228//! ```
229//!
230//! # Mutable Event Handlers
231//!
232//! For handlers that need to maintain mutable state:
233//! ```rust
234//! use disruptor_rs::{
235//!     processor::EventProcessorFactory,
236//!     traits::{EventHandlerMut, Runnable},
237//!     sequence::Sequence,
238//! };
239//!
240//! struct MyEvent;
241//!
242//! struct StatefulHandler {
243//!     processed_count: usize,
244//! }
245//!
246//! impl EventHandlerMut<MyEvent> for StatefulHandler {
247//!     fn on_event(&mut self, _event: &MyEvent, sequence: Sequence, _end_of_batch: bool) {
248//!         self.processed_count += 1;
249//!         println!("Processed {} events, current sequence: {}", self.processed_count, sequence);
250//!     }
251//!
252//!     fn on_start(&mut self) {
253//!         self.processed_count = 0;
254//!         println!("Starting stateful handler");
255//!     }
256//!
257//!     fn on_shutdown(&mut self) {
258//!         println!("Shutting down after processing {} events", self.processed_count);
259//!     }
260//! }
261//!
262//! // Create a mutable processor
263//! let handler = StatefulHandler { processed_count: 0 };
264//! let processor = EventProcessorFactory::create_mut(handler);
265//! ```
266
267use std::sync::{
268    atomic::{AtomicU8, Ordering},
269    Arc,
270};
271
272use crate::{
273    sequence::AtomicSequence,
274    traits::{DataProvider, EventHandler, EventProcessor, Runnable, SequenceBarrier},
275    EventHandlerMut, EventProcessorMut,
276};
277
278pub struct EventProcessorFactory;
279
280impl EventProcessorFactory {
281    pub fn create<'a, E, T>(handler: E) -> impl EventProcessor<'a, T>
282    where
283        E: EventHandler<T> + Send + 'a,
284        T: Send + 'a,
285    {
286        Processor {
287            handler,
288            cursor: Default::default(),
289            _marker: Default::default(),
290        }
291    }
292
293    pub fn create_mut<'a, E, T>(handler: E) -> impl EventProcessorMut<'a, T>
294    where
295        E: EventHandlerMut<T> + Send + 'a,
296        T: Send + 'a,
297    {
298        ProcessorMut {
299            handler,
300            cursor: Default::default(),
301            _marker: Default::default(),
302        }
303    }
304}
305
306struct Processor<E, T> {
307    handler: E,
308    cursor: Arc<AtomicSequence>,
309    _marker: std::marker::PhantomData<T>,
310}
311
312struct ProcessorMut<E, T> {
313    handler: E,
314    cursor: Arc<AtomicSequence>,
315    _marker: std::marker::PhantomData<T>,
316}
317
318enum RunnableProcessorState {
319    Idle = 0,
320    Halted = 1,
321    Running = 2,
322}
323
324struct RunnableProcessor<E, T, D: DataProvider<T>, B: SequenceBarrier> {
325    running: AtomicU8,
326    processor: Processor<E, T>,
327    data_provider: Arc<D>,
328    barrier: B,
329}
330
331struct RunnableProcessorMut<E, T, D: DataProvider<T>, B: SequenceBarrier> {
332    running: AtomicU8,
333    processor: ProcessorMut<E, T>,
334    data_provider: Arc<D>,
335    barrier: B,
336}
337
338impl<E, T, D, B> RunnableProcessor<E, T, D, B>
339where
340    E: EventHandler<T> + Send,
341    D: DataProvider<T>,
342    B: SequenceBarrier,
343    T: Send,
344{
345    fn process_events(&self) {
346        let f = &self.processor.handler;
347        let cursor = &self.processor.cursor;
348        let data_provider = &self.data_provider;
349        let barrier = &self.barrier;
350
351        while self.running.load(Ordering::Acquire) == RunnableProcessorState::Running as u8 {
352            let next_sequence = cursor.get() + 1;
353            let available_sequence = barrier.wait_for(next_sequence);
354
355            match available_sequence {
356                Some(available_sequence) => {
357                    for i in next_sequence..=available_sequence {
358                        let event = unsafe { data_provider.get(i) };
359                        f.on_event(event, i, i == available_sequence);
360                    }
361
362                    cursor.set(available_sequence);
363                    barrier.signal();
364                }
365                None => {
366                    return;
367                }
368            }
369        }
370    }
371}
372
373impl<'a, E, T, D, B> RunnableProcessorMut<E, T, D, B>
374where
375    E: EventHandlerMut<T> + Send + 'a,
376    D: DataProvider<T>,
377    B: SequenceBarrier,
378    T: Send + 'a,
379{
380    fn process_events(&mut self) {
381        let f = &mut self.processor.handler;
382        let cursor = &self.processor.cursor;
383        let data_provider = &self.data_provider;
384        let barrier = &self.barrier;
385
386        while self.running.load(Ordering::Acquire) == RunnableProcessorState::Running as u8 {
387            let next_sequence = cursor.get() + 1;
388            let available_sequence = barrier.wait_for(next_sequence);
389
390            match available_sequence {
391                Some(available_sequence) => {
392                    for i in next_sequence..=available_sequence {
393                        let event = unsafe { data_provider.get(i) };
394                        f.on_event(event, i, i == available_sequence);
395                    }
396
397                    cursor.set(available_sequence);
398                    barrier.signal();
399                }
400                None => {
401                    return;
402                }
403            }
404        }
405    }
406}
407
408impl<'a, E, T> EventProcessor<'a, T> for Processor<E, T>
409where
410    E: EventHandler<T> + Send + 'a,
411    T: Send + 'a,
412{
413    fn get_cursor(&self) -> Arc<AtomicSequence> {
414        self.cursor.clone()
415    }
416
417    fn create<D: DataProvider<T> + 'a, S: SequenceBarrier + 'a>(
418        self,
419        data_provider: Arc<D>,
420        barrier: S,
421    ) -> Box<dyn Runnable + 'a> {
422        Box::new(RunnableProcessor {
423            running: AtomicU8::new(RunnableProcessorState::Idle as u8),
424            processor: self,
425            data_provider,
426            barrier,
427        })
428    }
429
430    fn get_sequence(&self) -> Arc<AtomicSequence> {
431        self.cursor.clone()
432    }
433}
434
435impl<'a, E, T> EventProcessorMut<'a, T> for ProcessorMut<E, T>
436where
437    E: EventHandlerMut<T> + Send + 'a,
438    T: Send + 'a,
439{
440    fn get_cursor(&self) -> Arc<AtomicSequence> {
441        self.cursor.clone()
442    }
443
444    fn create<D: DataProvider<T> + 'a, S: SequenceBarrier + 'a>(
445        self,
446        data_provider: Arc<D>,
447        barrier: S,
448    ) -> Box<dyn Runnable + 'a> {
449        Box::new(RunnableProcessorMut {
450            running: AtomicU8::new(RunnableProcessorState::Idle as u8),
451            processor: self,
452            data_provider,
453            barrier,
454        })
455    }
456
457    fn get_sequence(&self) -> Arc<AtomicSequence> {
458        self.cursor.clone()
459    }
460}
461
462impl<E, T, D, B> Runnable for RunnableProcessor<E, T, D, B>
463where
464    E: EventHandler<T> + Send,
465    D: DataProvider<T>,
466    B: SequenceBarrier,
467    T: Send,
468{
469    fn run(&mut self) {
470        self.running.store(
471            RunnableProcessorState::Running as u8,
472            std::sync::atomic::Ordering::Release,
473        );
474        self.processor.handler.on_start();
475        self.process_events();
476        self.running.store(
477            RunnableProcessorState::Idle as u8,
478            std::sync::atomic::Ordering::Release,
479        );
480        self.processor.handler.on_shutdown();
481    }
482
483    fn stop(&mut self) {
484        self.running.store(
485            RunnableProcessorState::Halted as u8,
486            std::sync::atomic::Ordering::Release,
487        );
488    }
489
490    fn is_running(&self) -> bool {
491        self.running.load(std::sync::atomic::Ordering::Acquire)
492            == RunnableProcessorState::Running as u8
493    }
494}
495
496impl<E, T, D, B> Runnable for RunnableProcessorMut<E, T, D, B>
497where
498    E: EventHandlerMut<T> + Send,
499    D: DataProvider<T>,
500    B: SequenceBarrier,
501    T: Send,
502{
503    fn run(&mut self) {
504        self.running.store(
505            RunnableProcessorState::Running as u8,
506            std::sync::atomic::Ordering::Release,
507        );
508        self.processor.handler.on_start();
509        self.process_events();
510        self.running.store(
511            RunnableProcessorState::Idle as u8,
512            std::sync::atomic::Ordering::Release,
513        );
514        self.processor.handler.on_shutdown();
515    }
516
517    fn stop(&mut self) {
518        self.running.store(
519            RunnableProcessorState::Halted as u8,
520            std::sync::atomic::Ordering::Release,
521        );
522    }
523
524    fn is_running(&self) -> bool {
525        self.running.load(std::sync::atomic::Ordering::Acquire)
526            == RunnableProcessorState::Running as u8
527    }
528}