disruptor_rs/traits.rs
1//! Core traits defining the Disruptor pattern interface.
2//!
3//! This module contains the fundamental traits that make up the Disruptor pattern:
4//! - Event processing and handling
5//! - Sequence management
6//! - Data access and storage
7//! - Execution control
8//!
9//! # Core Traits Overview
10//!
11//! ## Event Processing
12//! - [`EventProcessor`]: Processes events from the ring buffer
13//! - [`EventHandler`]: Handles individual events
14//! - [`EventProducer`]: Produces events into the ring buffer
15//!
16//! ## Sequence Management
17//! - [`Sequencer`]: Manages sequences for event coordination
18//! - [`SequenceBarrier`]: Controls access to sequences
19//! - [`WaitingStrategy`]: Defines how threads wait for available sequences
20//!
21//! ## Data Access
22//! - [`DataProvider`]: Provides safe and unsafe access to the underlying buffer
23//!
24//! ## Execution
25//! - [`Runnable`]: Base interface for executable components
26//! - [`EventProcessorExecutor`]: Manages execution of event processors
27//! - [`ExecutorHandle`]: Controls executor lifecycle
28//!
29//! # Examples
30//!
31//! ```rust
32//! use disruptor_rs::{EventHandler, sequence::Sequence};
33//!
34//! struct MyHandler;
35//! impl EventHandler<i64> for MyHandler {
36//! fn on_event(&self, event: &i64, sequence: Sequence, end_of_batch: bool) {
37//! // Process the event
38//! }
39//! fn on_start(&self) {}
40//! fn on_shutdown(&self) {}
41//! }
42//! ```
43
44use std::sync::Arc;
45
46use crate::sequence::{AtomicSequence, Sequence};
47
48/// Controls access to sequences in the ring buffer.
49///
50/// A sequence barrier determines when sequences are available for processing
51/// and manages coordination between different components.
52///
53/// # Methods
54/// * `wait_for` - Blocks until a sequence becomes available
55/// * `signal` - Signals that new sequences may be available
56pub trait SequenceBarrier: Send + Sync {
57 fn wait_for(&self, sequence: Sequence) -> Option<Sequence>;
58 fn signal(&self);
59}
60
61/// Manages sequence generation and coordination in the ring buffer.
62///
63/// The sequencer is responsible for generating new sequence numbers and
64/// managing the relationship between publishers and subscribers.
65///
66/// # Type Parameters
67/// * `Barrier` - The type of sequence barrier used by this sequencer
68///
69/// # Methods
70/// * `add_gating_sequence` - Adds a gating sequence to the sequencer
71/// * `remove_gating_sequence` - Removes a gating sequence from the sequencer
72/// * `create_sequence_barrier` - Creates a sequence barrier with the given gating sequences
73/// * `get_cursor` - Returns the current cursor value
74/// * `next` - Gets the next sequence value
75/// * `publish` - Publishes the given sequence range
76/// * `drain` - Drains the sequencer
77pub trait Sequencer {
78 type Barrier: SequenceBarrier;
79 // Inteferface methods
80 fn add_gating_sequence(&mut self, gating_sequence: &Arc<AtomicSequence>);
81 fn remove_gating_sequence(&mut self, sequence: &Arc<AtomicSequence>) -> bool;
82 fn create_sequence_barrier(&self, gating_sequences: &[Arc<AtomicSequence>]) -> Self::Barrier;
83
84 // Abstract methods
85 fn get_cursor(&self) -> Arc<AtomicSequence>;
86 fn next(&mut self, n: Sequence) -> (Sequence, Sequence);
87 fn publish(&self, low: Sequence, high: Sequence);
88 fn drain(self);
89}
90
91/// Defines how threads wait for available sequences.
92///
93/// Implements the strategy pattern for different waiting behaviors when
94/// sequences are not yet available.
95///
96/// # Examples
97///
98/// ```
99/// use disruptor_rs::traits::WaitingStrategy;
100/// use disruptor_rs::sequence::AtomicSequence;
101///
102/// #[derive(Default)]
103/// struct BlockingWaitStrategy;
104///
105/// impl WaitingStrategy for BlockingWaitStrategy {
106/// fn new() -> Self {
107/// BlockingWaitStrategy
108/// }
109///
110/// fn wait_for<F: Fn() -> bool>(
111/// &self,
112/// sequence: i64,
113/// dependencies: &[std::sync::Arc<AtomicSequence>],
114/// check_alert: F
115/// ) -> Option<i64> {
116/// // Implementation would go here
117/// None
118/// }
119///
120/// fn signal_all_when_blocking(&self) {
121/// // Implementation would go here
122/// }
123/// }
124/// ```
125///
126/// # Methods
127/// * `new` - Creates a new instance of the waiting strategy
128/// * `wait_for` - Waits for the given sequence to be available
129/// * `signal_all_when_blocking` - Signals that all threads should be blocked
130pub trait WaitingStrategy: Default + Send + Sync {
131 fn new() -> Self;
132 fn wait_for<F: Fn() -> bool>(
133 &self,
134 sequence: Sequence,
135 dependencies: &[Arc<AtomicSequence>],
136 check_alert: F,
137 ) -> Option<i64>;
138
139 fn signal_all_when_blocking(&self);
140}
141
142/// Provides safe and unsafe access to the underlying ring buffer.
143///
144/// # Safety
145///
146/// This trait provides both safe and unsafe methods for accessing the buffer.
147/// Implementors must ensure proper bounds checking and memory safety.
148///
149/// # Type Parameters
150/// * `T` - The type of elements stored in the buffer.
151///
152/// This lint allow is necessary because DataProvider::get_mut returns a mutable reference from an immutable one.
153/// This is safe in our case because we use interior mutability (UnsafeCell) in the RingBuffer implementation,
154/// and the safety is guaranteed by the Sequencer which ensures proper synchronization of access to the buffer.
155#[allow(clippy::mut_from_ref)]
156pub trait DataProvider<T>: Send + Sync {
157 fn get_capacity(&self) -> usize;
158
159 /// # Safety
160 /// Caller must ensure the sequence is valid and in bounds
161 unsafe fn get(&self, sequence: Sequence) -> &T;
162
163 /// # Safety
164 /// Caller must ensure the sequence is valid and in bounds and no other references exist
165 unsafe fn get_mut(&self, sequence: Sequence) -> &mut T;
166}
167
168/// Defines the lifecycle operations for executable components.
169///
170/// Provides basic control operations for starting, stopping, and
171/// checking the status of running components.
172///
173/// # Methods
174/// * `run` - Starts the component
175/// * `stop` - Stops the component
176/// * `is_running` - Checks if the component is running
177pub trait Runnable: Send {
178 fn run(&mut self);
179 fn stop(&mut self);
180 fn is_running(&self) -> bool;
181}
182
183/// A trait for providing an event processor.
184/// # Types
185/// - `T`: The type of events to process.
186/// # Methods
187/// * `create`: Creates a new event processor.
188/// * `get_sequence`: Returns the sequence of the event processor.
189pub trait EventProcessor<'a, T> {
190 fn get_cursor(&self) -> Arc<AtomicSequence>;
191 fn create<D: DataProvider<T> + 'a, S: SequenceBarrier + 'a>(
192 self,
193 data_provider: Arc<D>,
194 barrier: S,
195 ) -> Box<dyn Runnable + 'a>;
196 fn get_sequence(&self) -> Arc<AtomicSequence>;
197}
198
199/// A trait for providing an event processor with mutable access to the event.
200pub trait EventProcessorMut<'a, T> {
201 fn get_cursor(&self) -> Arc<AtomicSequence>;
202 fn create<D: DataProvider<T> + 'a, S: SequenceBarrier + 'a>(
203 self,
204 data_provider: Arc<D>,
205 barrier: S,
206 ) -> Box<dyn Runnable + 'a>;
207 fn get_sequence(&self) -> Arc<AtomicSequence>;
208}
209
210/// A trait for providing an event handler.
211/// # Types
212/// - `T`: The type of events to handle.
213/// # Methods
214/// * `on_event`: Handles the given event.
215/// * `on_start`: Called when the event handler starts.
216/// * `on_shutdown`: Called when the event handler shuts down.
217pub trait EventHandler<T> {
218 fn on_event(&self, event: &T, sequence: Sequence, end_of_batch: bool);
219 fn on_start(&self);
220 fn on_shutdown(&self);
221}
222
223/// A trait for providing an event handler with mutable access to the event.
224/// # Types
225/// - `T`: The type of events to handle.
226/// # Methods
227/// * `on_event`: Handles the given event.
228/// * `on_start`: Called when the event handler starts.
229/// * `on_shutdown`: Called when the event handler shuts down.
230pub trait EventHandlerMut<T> {
231 fn on_event(&mut self, event: &T, sequence: Sequence, end_of_batch: bool);
232 fn on_start(&mut self);
233 fn on_shutdown(&mut self);
234}
235
236/// A trait for providing an executor thread handle.
237/// # Methods
238/// * `join`: Joins the executor thread.
239pub trait ExecutorHandle {
240 fn join(self);
241}
242
243/// A trait for providing an executor.
244/// # Types
245/// - `Handle`: The type of executor handle.
246/// # Methods
247/// * `with_runnales`: Creates a new executor with the given runnables.
248/// * `spwan`: Spawns the executor.
249pub trait EventProcessorExecutor<'a> {
250 type Handle: ExecutorHandle;
251 fn with_runnables(runnables: Vec<Box<dyn Runnable + 'a>>) -> Self;
252 fn spawn(self) -> Self::Handle;
253}
254
255/// A trait for producing events.
256/// # Types
257/// - `Item`: The type of events to produce.
258/// # Methods
259/// * `write`: Writes the given event.
260/// * `drain`: Drains the event producer.
261pub trait EventProducer<'a> {
262 type Item;
263
264 fn write<F, U, I, E>(&mut self, items: I, f: F)
265 where
266 I: IntoIterator<Item = U, IntoIter = E>,
267 E: ExactSizeIterator<Item = U>,
268 F: Fn(&mut Self::Item, Sequence, &U);
269
270 fn drain(self);
271}