disruptor_rs/
builder.rs

1use std::sync::Arc;
2
3use crate::{
4    executor::ThreadedExecutor,
5    processor::EventProcessorFactory,
6    producer::Producer,
7    ringbuffer::RingBuffer,
8    sequence::AtomicSequence,
9    sequencer::SingleProducerSequencer,
10    traits::{
11        DataProvider, EventHandler, EventProcessor, EventProcessorExecutor, EventProducer,
12        Runnable, Sequencer, WaitingStrategy,
13    },
14    waiting::{BusySpinWaitStrategy, YieldingWaitStrategy},
15    EventHandlerMut, EventProcessorMut,
16};
17
18/// # Disruptor Builder Pattern Guide
19///
20/// The builder follows a type-state pattern to ensure compile-time correctness.
21/// Each step in the builder chain enforces required configuration in a specific order:
22///
23/// 1. Start with data provider (ring buffer)
24/// 2. Configure waiting strategy
25/// 3. Set up sequencer
26/// 4. Add event handlers
27/// 5. Build the final disruptor
28///
29/// ## Example Usage
30/// ```rust
31/// use disruptor_rs::{
32///     DisruptorBuilder, EventHandler, EventProcessorExecutor, EventProducer,
33///     sequence::Sequence,
34/// };
35///
36/// #[derive(Default)]
37/// struct MyEvent;
38///
39/// #[derive(Default)]
40/// struct MyEventHandler;
41///
42/// impl EventHandler<MyEvent> for MyEventHandler {
43///     fn on_event(&self, _event: &MyEvent, _sequence: Sequence, _end_of_batch: bool) {}
44///     fn on_start(&self) {}
45///     fn on_shutdown(&self) {}
46/// }
47///
48/// let (executor, producer) = DisruptorBuilder::with_ring_buffer::<MyEvent>(1024)
49///     .with_busy_spin_waiting_strategy()
50///     .with_single_producer_sequencer()
51///     .with_barrier(|scope| {
52///         scope.handle_events(MyEventHandler::default());
53///     })
54///     .build();
55/// ```
56///
57/// ## Builder States
58/// - `WithDataProvider`: Initial state, holds the ring buffer
59/// - `WithWaitingStrategy`: Configures how consumers wait for new events
60/// - `WithSequencer`: Manages the sequencing of events
61/// - `WithEventHandlers`: Configures event processing chain
62///
63/// ## Barrier Scopes
64/// The `with_barrier` method creates scopes for configuring event handlers
65/// in a dependency chain. Handlers within the same barrier scope can run
66/// in parallel, while different barrier scopes run sequentially.
67///
68/// ```rust
69/// use disruptor_rs::{
70///     DisruptorBuilder, EventHandler, EventProcessorExecutor, EventProducer,
71///     sequence::Sequence,
72/// };
73///
74/// #[derive(Default)]
75/// struct MyEvent;
76///
77/// #[derive(Default)]
78/// struct MyEventHandler;
79///
80/// impl EventHandler<MyEvent> for MyEventHandler {
81///     fn on_event(&self, _event: &MyEvent, _sequence: Sequence, _end_of_batch: bool) {}
82///     fn on_start(&self) {}
83///     fn on_shutdown(&self) {}
84/// }
85///
86/// let (executor, producer) = DisruptorBuilder::with_ring_buffer::<MyEvent>(1024)
87///     .with_busy_spin_waiting_strategy()
88///     .with_single_producer_sequencer()
89///     .with_barrier(|scope| {
90///         // These handlers run in parallel
91///         scope.handle_events(MyEventHandler::default());
92///         scope.handle_events(MyEventHandler::default());
93///     })
94///     .with_barrier(|scope| {
95///         // This handler waits for both previous handlers
96///         scope.handle_events(MyEventHandler::default());
97///     })
98///     .build();
99/// ```
100
101#[derive(Debug)]
102pub struct DisruptorBuilder {}
103
104pub struct WithDataProvider<D: DataProvider<T>, T>
105where
106    T: Send + Sync,
107{
108    data_provider: Arc<D>,
109    _marker: std::marker::PhantomData<T>,
110}
111
112pub struct WithWaitingStrategy<W: WaitingStrategy, D: DataProvider<T>, T>
113where
114    T: Send + Sync,
115{
116    with_data_provider: WithDataProvider<D, T>,
117    _waiting_strategy: std::marker::PhantomData<W>,
118}
119
120pub struct WithSequencer<S: Sequencer, W: WaitingStrategy, D: DataProvider<T>, T>
121where
122    T: Send + Sync,
123{
124    with_waiting_strategy: WithWaitingStrategy<W, D, T>,
125    sequencer: S,
126}
127
128pub struct BarrierScope<'a, S: Sequencer, D: DataProvider<T>, T> {
129    sequencer: S,
130    data_provider: Arc<D>,
131    gating_sequences: Vec<Arc<AtomicSequence>>,
132    cursors: Vec<Arc<AtomicSequence>>,
133    event_handlers: Vec<Box<dyn Runnable + 'a>>,
134    _element: std::marker::PhantomData<T>,
135}
136
137pub struct WithEventHandlers<'a, S: Sequencer, W: WaitingStrategy, D: DataProvider<T>, T>
138where
139    T: Send + Sync,
140{
141    with_sequencer: WithSequencer<S, W, D, T>,
142    event_handlers: Vec<Box<dyn Runnable + 'a>>,
143    gating_sequences: Vec<Arc<AtomicSequence>>,
144}
145
146impl DisruptorBuilder {
147    #[allow(clippy::new_ret_no_self)]
148    pub fn new<D: DataProvider<T>, T>(data_provider: Arc<D>) -> WithDataProvider<D, T>
149    where
150        T: Send + Sync,
151    {
152        WithDataProvider {
153            data_provider,
154            _marker: std::marker::PhantomData,
155        }
156    }
157
158    pub fn with_ring_buffer<T>(capacity: usize) -> WithDataProvider<RingBuffer<T>, T>
159    where
160        T: Default + Send + Sync,
161    {
162        Self::new(Arc::new(RingBuffer::new(capacity)))
163    }
164}
165
166impl<D: DataProvider<T>, T> WithDataProvider<D, T>
167where
168    T: Send + Sync,
169{
170    pub fn with_waiting_strategy<W: WaitingStrategy>(self) -> WithWaitingStrategy<W, D, T> {
171        WithWaitingStrategy {
172            with_data_provider: self,
173            _waiting_strategy: Default::default(),
174        }
175    }
176
177    pub fn with_busy_spin_waiting_strategy(
178        self,
179    ) -> WithWaitingStrategy<BusySpinWaitStrategy, D, T> {
180        self.with_waiting_strategy()
181    }
182
183    pub fn with_yielding_waiting_strategy(self) -> WithWaitingStrategy<YieldingWaitStrategy, D, T> {
184        self.with_waiting_strategy()
185    }
186}
187
188impl<W: WaitingStrategy, D: DataProvider<T>, T> WithWaitingStrategy<W, D, T>
189where
190    T: Send + Sync,
191{
192    pub fn with_sequencer<S: Sequencer>(self, sequencer: S) -> WithSequencer<S, W, D, T> {
193        WithSequencer {
194            with_waiting_strategy: self,
195            sequencer,
196        }
197    }
198
199    pub fn with_single_producer_sequencer(
200        self,
201    ) -> WithSequencer<SingleProducerSequencer<W>, W, D, T> {
202        let buffer_size = self.with_data_provider.data_provider.get_capacity();
203        self.with_sequencer(SingleProducerSequencer::new(buffer_size, W::new()))
204    }
205}
206
207impl<'a, S: Sequencer + 'a, W: WaitingStrategy, D: DataProvider<T> + 'a, T: Send + Sync + 'a>
208    WithSequencer<S, W, D, T>
209where
210    T: Send + Sync,
211{
212    pub fn with_barrier(
213        mut self,
214        f: impl FnOnce(&mut BarrierScope<'a, S, D, T>),
215    ) -> WithEventHandlers<'a, S, W, D, T> {
216        let cursor = self.sequencer.get_cursor();
217        let mut scope = BarrierScope {
218            sequencer: self.sequencer,
219            data_provider: self
220                .with_waiting_strategy
221                .with_data_provider
222                .data_provider
223                .clone(),
224            gating_sequences: vec![cursor],
225            event_handlers: Vec::new(),
226            cursors: Vec::new(),
227            _element: Default::default(),
228        };
229
230        f(&mut scope);
231        self.sequencer = scope.sequencer;
232
233        WithEventHandlers {
234            with_sequencer: self,
235            event_handlers: scope.event_handlers,
236            gating_sequences: scope.cursors,
237        }
238    }
239}
240
241impl<'a, S: Sequencer + 'a, D: DataProvider<T> + 'a, T: Send + 'a> BarrierScope<'a, S, D, T> {
242    pub fn handle_events<E>(&mut self, handler: E)
243    where
244        E: EventHandler<T> + Send + 'a,
245    {
246        self.handle_events_with(EventProcessorFactory::create(handler));
247    }
248
249    pub fn handle_events_mut<E>(&mut self, handler: E)
250    where
251        E: EventHandlerMut<T> + Send + 'a,
252    {
253        self.handle_events_with_mut(EventProcessorFactory::create_mut(handler));
254    }
255
256    pub fn handle_events_with<E: EventProcessor<'a, T>>(&mut self, processor: E) {
257        self.cursors.push(processor.get_cursor());
258        let barrier = self
259            .sequencer
260            .create_sequence_barrier(&self.gating_sequences);
261
262        let runnable = processor.create(self.data_provider.clone(), barrier);
263        self.event_handlers.push(runnable);
264    }
265
266    pub fn handle_events_with_mut<E: EventProcessorMut<'a, T>>(&mut self, processor: E) {
267        self.cursors.push(processor.get_cursor());
268        let barrier = self
269            .sequencer
270            .create_sequence_barrier(&self.gating_sequences);
271
272        let runnable = processor.create(self.data_provider.clone(), barrier);
273        self.event_handlers.push(runnable);
274    }
275
276    pub fn with_barrier(mut self, f: impl FnOnce(&mut BarrierScope<'a, S, D, T>)) {
277        let mut scope = BarrierScope {
278            sequencer: self.sequencer,
279            data_provider: self.data_provider.clone(),
280            gating_sequences: self.cursors,
281            event_handlers: Vec::new(),
282            cursors: Vec::new(),
283            _element: Default::default(),
284        };
285
286        f(&mut scope);
287        self.event_handlers.append(&mut scope.event_handlers);
288    }
289}
290
291impl<'a, S: Sequencer + 'a, W: WaitingStrategy, D: DataProvider<T> + 'a, T: Send + Sync + 'a>
292    WithEventHandlers<'a, S, W, D, T>
293where
294    T: Send + Sync,
295{
296    pub fn with_barrier(
297        mut self,
298        f: impl FnOnce(&mut BarrierScope<'a, S, D, T>),
299    ) -> WithEventHandlers<'a, S, W, D, T> {
300        let mut scope = BarrierScope {
301            gating_sequences: self.gating_sequences.clone(),
302            cursors: Vec::new(),
303            sequencer: self.with_sequencer.sequencer,
304            data_provider: self
305                .with_sequencer
306                .with_waiting_strategy
307                .with_data_provider
308                .data_provider
309                .clone(),
310            event_handlers: Vec::new(),
311            _element: Default::default(),
312        };
313
314        f(&mut scope);
315        self.with_sequencer.sequencer = scope.sequencer;
316        self.event_handlers.append(&mut scope.event_handlers);
317        self.gating_sequences = scope.cursors;
318
319        self
320    }
321
322    pub fn build(
323        self,
324    ) -> (
325        impl EventProcessorExecutor<'a>,
326        impl EventProducer<'a, Item = T>,
327    ) {
328        self.build_with_executor::<ThreadedExecutor<'a>>()
329    }
330
331    pub fn build_with_executor<E: EventProcessorExecutor<'a>>(
332        mut self,
333    ) -> (E, impl EventProducer<'a, Item = T>) {
334        for gs in &self.gating_sequences {
335            self.with_sequencer.sequencer.add_gating_sequence(gs);
336        }
337        let executor = E::with_runnables(self.event_handlers);
338        let producer = Producer::new(
339            self.with_sequencer
340                .with_waiting_strategy
341                .with_data_provider
342                .data_provider
343                .clone(),
344            self.with_sequencer.sequencer,
345        );
346        (executor, producer)
347    }
348}