disruptor_rs/
traits.rs

1//! Core traits defining the Disruptor pattern interface.
2//!
3//! This module contains the fundamental traits that make up the Disruptor pattern:
4//! - Event processing and handling
5//! - Sequence management
6//! - Data access and storage
7//! - Execution control
8//!
9//! # Core Traits Overview
10//!
11//! ## Event Processing
12//! - [`EventProcessor`]: Processes events from the ring buffer
13//! - [`EventHandler`]: Handles individual events
14//! - [`EventProducer`]: Produces events into the ring buffer
15//!
16//! ## Sequence Management
17//! - [`Sequencer`]: Manages sequences for event coordination
18//! - [`SequenceBarrier`]: Controls access to sequences
19//! - [`WaitingStrategy`]: Defines how threads wait for available sequences
20//!
21//! ## Data Access
22//! - [`DataProvider`]: Provides safe and unsafe access to the underlying buffer
23//!
24//! ## Execution
25//! - [`Runnable`]: Base interface for executable components
26//! - [`EventProcessorExecutor`]: Manages execution of event processors
27//! - [`ExecutorHandle`]: Controls executor lifecycle
28//!
29//! # Examples
30//!
31//! ```rust
32//! use disruptor_rs::{EventHandler, sequence::Sequence};
33//!
34//! struct MyHandler;
35//! impl EventHandler<i64> for MyHandler {
36//!     fn on_event(&self, event: &i64, sequence: Sequence, end_of_batch: bool) {
37//!         // Process the event
38//!     }
39//!     fn on_start(&self) {}
40//!     fn on_shutdown(&self) {}
41//! }
42//! ```
43
44use std::sync::Arc;
45
46use crate::sequence::{AtomicSequence, Sequence};
47
48/// Controls access to sequences in the ring buffer.
49///
50/// A sequence barrier determines when sequences are available for processing
51/// and manages coordination between different components.
52///
53/// # Methods
54/// * `wait_for` - Blocks until a sequence becomes available
55/// * `signal` - Signals that new sequences may be available
56pub trait SequenceBarrier: Send + Sync {
57    fn wait_for(&self, sequence: Sequence) -> Option<Sequence>;
58    fn signal(&self);
59}
60
61/// Manages sequence generation and coordination in the ring buffer.
62///
63/// The sequencer is responsible for generating new sequence numbers and
64/// managing the relationship between publishers and subscribers.
65///
66/// # Type Parameters
67/// * `Barrier` - The type of sequence barrier used by this sequencer
68///
69/// # Methods
70/// * `add_gating_sequence` - Adds a gating sequence to the sequencer
71/// * `remove_gating_sequence` - Removes a gating sequence from the sequencer
72/// * `create_sequence_barrier` - Creates a sequence barrier with the given gating sequences
73/// * `get_cursor` - Returns the current cursor value
74/// * `next` - Gets the next sequence value
75/// * `publish` - Publishes the given sequence range
76/// * `drain` - Drains the sequencer
77pub trait Sequencer {
78    type Barrier: SequenceBarrier;
79    // Inteferface methods
80    fn add_gating_sequence(&mut self, gating_sequence: &Arc<AtomicSequence>);
81    fn remove_gating_sequence(&mut self, sequence: &Arc<AtomicSequence>) -> bool;
82    fn create_sequence_barrier(&self, gating_sequences: &[Arc<AtomicSequence>]) -> Self::Barrier;
83
84    // Abstract methods
85    fn get_cursor(&self) -> Arc<AtomicSequence>;
86    fn next(&mut self, n: Sequence) -> (Sequence, Sequence);
87    fn publish(&self, low: Sequence, high: Sequence);
88    fn drain(self);
89}
90
91/// Defines how threads wait for available sequences.
92///
93/// Implements the strategy pattern for different waiting behaviors when
94/// sequences are not yet available.
95///
96/// # Examples
97///
98/// ```
99/// use disruptor_rs::traits::WaitingStrategy;
100/// use disruptor_rs::sequence::AtomicSequence;
101///
102/// #[derive(Default)]
103/// struct BlockingWaitStrategy;
104///
105/// impl WaitingStrategy for BlockingWaitStrategy {
106///     fn new() -> Self {
107///         BlockingWaitStrategy
108///     }
109///
110///     fn wait_for<F: Fn() -> bool>(
111///         &self,
112///         sequence: i64,
113///         dependencies: &[std::sync::Arc<AtomicSequence>],
114///         check_alert: F
115///     ) -> Option<i64> {
116///         // Implementation would go here
117///         None
118///     }
119///
120///     fn signal_all_when_blocking(&self) {
121///         // Implementation would go here
122///     }
123/// }
124/// ```
125///
126/// # Methods
127/// * `new` - Creates a new instance of the waiting strategy
128/// * `wait_for` - Waits for the given sequence to be available
129/// * `signal_all_when_blocking` - Signals that all threads should be blocked
130pub trait WaitingStrategy: Default + Send + Sync {
131    fn new() -> Self;
132    fn wait_for<F: Fn() -> bool>(
133        &self,
134        sequence: Sequence,
135        dependencies: &[Arc<AtomicSequence>],
136        check_alert: F,
137    ) -> Option<i64>;
138
139    fn signal_all_when_blocking(&self);
140}
141
142/// Provides safe and unsafe access to the underlying ring buffer.
143///
144/// # Safety
145///
146/// This trait provides both safe and unsafe methods for accessing the buffer.
147/// Implementors must ensure proper bounds checking and memory safety.
148///
149/// # Type Parameters
150/// * `T` - The type of elements stored in the buffer.
151///
152/// This lint allow is necessary because DataProvider::get_mut returns a mutable reference from an immutable one.
153/// This is safe in our case because we use interior mutability (UnsafeCell) in the RingBuffer implementation,
154/// and the safety is guaranteed by the Sequencer which ensures proper synchronization of access to the buffer.
155#[allow(clippy::mut_from_ref)]
156pub trait DataProvider<T>: Send + Sync {
157    fn get_capacity(&self) -> usize;
158
159    /// # Safety
160    /// Caller must ensure the sequence is valid and in bounds
161    unsafe fn get(&self, sequence: Sequence) -> &T;
162
163    /// # Safety
164    /// Caller must ensure the sequence is valid and in bounds and no other references exist
165    unsafe fn get_mut(&self, sequence: Sequence) -> &mut T;
166}
167
168/// Defines the lifecycle operations for executable components.
169///
170/// Provides basic control operations for starting, stopping, and
171/// checking the status of running components.
172///
173/// # Methods
174/// * `run` - Starts the component
175/// * `stop` - Stops the component
176/// * `is_running` - Checks if the component is running
177pub trait Runnable: Send {
178    fn run(&mut self);
179    fn stop(&mut self);
180    fn is_running(&self) -> bool;
181}
182
183/// A trait for providing an event processor.
184/// # Types
185/// - `T`: The type of events to process.
186/// # Methods
187/// * `create`: Creates a new event processor.
188/// * `get_sequence`: Returns the sequence of the event processor.
189pub trait EventProcessor<'a, T> {
190    fn get_cursor(&self) -> Arc<AtomicSequence>;
191    fn create<D: DataProvider<T> + 'a, S: SequenceBarrier + 'a>(
192        self,
193        data_provider: Arc<D>,
194        barrier: S,
195    ) -> Box<dyn Runnable + 'a>;
196    fn get_sequence(&self) -> Arc<AtomicSequence>;
197}
198
199/// A trait for providing an event processor with mutable access to the event.
200pub trait EventProcessorMut<'a, T> {
201    fn get_cursor(&self) -> Arc<AtomicSequence>;
202    fn create<D: DataProvider<T> + 'a, S: SequenceBarrier + 'a>(
203        self,
204        data_provider: Arc<D>,
205        barrier: S,
206    ) -> Box<dyn Runnable + 'a>;
207    fn get_sequence(&self) -> Arc<AtomicSequence>;
208}
209
210/// A trait for providing an event handler.
211/// # Types
212/// - `T`: The type of events to handle.
213/// # Methods
214/// * `on_event`: Handles the given event.
215/// * `on_start`: Called when the event handler starts.
216/// * `on_shutdown`: Called when the event handler shuts down.
217pub trait EventHandler<T> {
218    fn on_event(&self, event: &T, sequence: Sequence, end_of_batch: bool);
219    fn on_start(&self);
220    fn on_shutdown(&self);
221}
222
223/// A trait for providing an event handler with mutable access to the event.
224/// # Types
225/// - `T`: The type of events to handle.
226/// # Methods
227/// * `on_event`: Handles the given event.
228/// * `on_start`: Called when the event handler starts.
229/// * `on_shutdown`: Called when the event handler shuts down.
230pub trait EventHandlerMut<T> {
231    fn on_event(&mut self, event: &T, sequence: Sequence, end_of_batch: bool);
232    fn on_start(&mut self);
233    fn on_shutdown(&mut self);
234}
235
236/// A trait for providing an executor thread handle.
237/// # Methods
238/// * `join`: Joins the executor thread.
239pub trait ExecutorHandle {
240    fn join(self);
241}
242
243/// A trait for providing an executor.
244/// # Types
245/// - `Handle`: The type of executor handle.
246/// # Methods
247/// * `with_runnales`: Creates a new executor with the given runnables.
248/// * `spwan`: Spawns the executor.
249pub trait EventProcessorExecutor<'a> {
250    type Handle: ExecutorHandle;
251    fn with_runnables(runnables: Vec<Box<dyn Runnable + 'a>>) -> Self;
252    fn spawn(self) -> Self::Handle;
253}
254
255/// A trait for producing events.
256/// # Types
257/// - `Item`: The type of events to produce.
258/// # Methods
259/// * `write`: Writes the given event.
260/// * `drain`: Drains the event producer.
261pub trait EventProducer<'a> {
262    type Item;
263
264    fn write<F, U, I, E>(&mut self, items: I, f: F)
265    where
266        I: IntoIterator<Item = U, IntoIter = E>,
267        E: ExactSizeIterator<Item = U>,
268        F: Fn(&mut Self::Item, Sequence, &U);
269
270    fn drain(self);
271}