1use std::sync::Arc;
2
3use crate::{
4 executor::ThreadedExecutor,
5 processor::EventProcessorFactory,
6 producer::Producer,
7 ringbuffer::RingBuffer,
8 sequence::AtomicSequence,
9 sequencer::SingleProducerSequencer,
10 traits::{
11 DataProvider, EventHandler, EventProcessor, EventProcessorExecutor, EventProducer,
12 Runnable, Sequencer, WaitingStrategy,
13 },
14 waiting::{BusySpinWaitStrategy, YieldingWaitStrategy},
15 EventHandlerMut, EventProcessorMut,
16};
17
18#[derive(Debug)]
102pub struct DisruptorBuilder {}
103
104pub struct WithDataProvider<D: DataProvider<T>, T>
105where
106 T: Send + Sync,
107{
108 data_provider: Arc<D>,
109 _marker: std::marker::PhantomData<T>,
110}
111
112pub struct WithWaitingStrategy<W: WaitingStrategy, D: DataProvider<T>, T>
113where
114 T: Send + Sync,
115{
116 with_data_provider: WithDataProvider<D, T>,
117 _waiting_strategy: std::marker::PhantomData<W>,
118}
119
120pub struct WithSequencer<S: Sequencer, W: WaitingStrategy, D: DataProvider<T>, T>
121where
122 T: Send + Sync,
123{
124 with_waiting_strategy: WithWaitingStrategy<W, D, T>,
125 sequencer: S,
126}
127
128pub struct BarrierScope<'a, S: Sequencer, D: DataProvider<T>, T> {
129 sequencer: S,
130 data_provider: Arc<D>,
131 gating_sequences: Vec<Arc<AtomicSequence>>,
132 cursors: Vec<Arc<AtomicSequence>>,
133 event_handlers: Vec<Box<dyn Runnable + 'a>>,
134 _element: std::marker::PhantomData<T>,
135}
136
137pub struct WithEventHandlers<'a, S: Sequencer, W: WaitingStrategy, D: DataProvider<T>, T>
138where
139 T: Send + Sync,
140{
141 with_sequencer: WithSequencer<S, W, D, T>,
142 event_handlers: Vec<Box<dyn Runnable + 'a>>,
143 gating_sequences: Vec<Arc<AtomicSequence>>,
144}
145
146impl DisruptorBuilder {
147 #[allow(clippy::new_ret_no_self)]
148 pub fn new<D: DataProvider<T>, T>(data_provider: Arc<D>) -> WithDataProvider<D, T>
149 where
150 T: Send + Sync,
151 {
152 WithDataProvider {
153 data_provider,
154 _marker: std::marker::PhantomData,
155 }
156 }
157
158 pub fn with_ring_buffer<T>(capacity: usize) -> WithDataProvider<RingBuffer<T>, T>
159 where
160 T: Default + Send + Sync,
161 {
162 Self::new(Arc::new(RingBuffer::new(capacity)))
163 }
164}
165
166impl<D: DataProvider<T>, T> WithDataProvider<D, T>
167where
168 T: Send + Sync,
169{
170 pub fn with_waiting_strategy<W: WaitingStrategy>(self) -> WithWaitingStrategy<W, D, T> {
171 WithWaitingStrategy {
172 with_data_provider: self,
173 _waiting_strategy: Default::default(),
174 }
175 }
176
177 pub fn with_busy_spin_waiting_strategy(
178 self,
179 ) -> WithWaitingStrategy<BusySpinWaitStrategy, D, T> {
180 self.with_waiting_strategy()
181 }
182
183 pub fn with_yielding_waiting_strategy(self) -> WithWaitingStrategy<YieldingWaitStrategy, D, T> {
184 self.with_waiting_strategy()
185 }
186}
187
188impl<W: WaitingStrategy, D: DataProvider<T>, T> WithWaitingStrategy<W, D, T>
189where
190 T: Send + Sync,
191{
192 pub fn with_sequencer<S: Sequencer>(self, sequencer: S) -> WithSequencer<S, W, D, T> {
193 WithSequencer {
194 with_waiting_strategy: self,
195 sequencer,
196 }
197 }
198
199 pub fn with_single_producer_sequencer(
200 self,
201 ) -> WithSequencer<SingleProducerSequencer<W>, W, D, T> {
202 let buffer_size = self.with_data_provider.data_provider.get_capacity();
203 self.with_sequencer(SingleProducerSequencer::new(buffer_size, W::new()))
204 }
205}
206
207impl<'a, S: Sequencer + 'a, W: WaitingStrategy, D: DataProvider<T> + 'a, T: Send + Sync + 'a>
208 WithSequencer<S, W, D, T>
209where
210 T: Send + Sync,
211{
212 pub fn with_barrier(
213 mut self,
214 f: impl FnOnce(&mut BarrierScope<'a, S, D, T>),
215 ) -> WithEventHandlers<'a, S, W, D, T> {
216 let cursor = self.sequencer.get_cursor();
217 let mut scope = BarrierScope {
218 sequencer: self.sequencer,
219 data_provider: self
220 .with_waiting_strategy
221 .with_data_provider
222 .data_provider
223 .clone(),
224 gating_sequences: vec![cursor],
225 event_handlers: Vec::new(),
226 cursors: Vec::new(),
227 _element: Default::default(),
228 };
229
230 f(&mut scope);
231 self.sequencer = scope.sequencer;
232
233 WithEventHandlers {
234 with_sequencer: self,
235 event_handlers: scope.event_handlers,
236 gating_sequences: scope.cursors,
237 }
238 }
239}
240
241impl<'a, S: Sequencer + 'a, D: DataProvider<T> + 'a, T: Send + 'a> BarrierScope<'a, S, D, T> {
242 pub fn handle_events<E>(&mut self, handler: E)
243 where
244 E: EventHandler<T> + Send + 'a,
245 {
246 self.handle_events_with(EventProcessorFactory::create(handler));
247 }
248
249 pub fn handle_events_mut<E>(&mut self, handler: E)
250 where
251 E: EventHandlerMut<T> + Send + 'a,
252 {
253 self.handle_events_with_mut(EventProcessorFactory::create_mut(handler));
254 }
255
256 pub fn handle_events_with<E: EventProcessor<'a, T>>(&mut self, processor: E) {
257 self.cursors.push(processor.get_cursor());
258 let barrier = self
259 .sequencer
260 .create_sequence_barrier(&self.gating_sequences);
261
262 let runnable = processor.create(self.data_provider.clone(), barrier);
263 self.event_handlers.push(runnable);
264 }
265
266 pub fn handle_events_with_mut<E: EventProcessorMut<'a, T>>(&mut self, processor: E) {
267 self.cursors.push(processor.get_cursor());
268 let barrier = self
269 .sequencer
270 .create_sequence_barrier(&self.gating_sequences);
271
272 let runnable = processor.create(self.data_provider.clone(), barrier);
273 self.event_handlers.push(runnable);
274 }
275
276 pub fn with_barrier(mut self, f: impl FnOnce(&mut BarrierScope<'a, S, D, T>)) {
277 let mut scope = BarrierScope {
278 sequencer: self.sequencer,
279 data_provider: self.data_provider.clone(),
280 gating_sequences: self.cursors,
281 event_handlers: Vec::new(),
282 cursors: Vec::new(),
283 _element: Default::default(),
284 };
285
286 f(&mut scope);
287 self.event_handlers.append(&mut scope.event_handlers);
288 }
289}
290
291impl<'a, S: Sequencer + 'a, W: WaitingStrategy, D: DataProvider<T> + 'a, T: Send + Sync + 'a>
292 WithEventHandlers<'a, S, W, D, T>
293where
294 T: Send + Sync,
295{
296 pub fn with_barrier(
297 mut self,
298 f: impl FnOnce(&mut BarrierScope<'a, S, D, T>),
299 ) -> WithEventHandlers<'a, S, W, D, T> {
300 let mut scope = BarrierScope {
301 gating_sequences: self.gating_sequences.clone(),
302 cursors: Vec::new(),
303 sequencer: self.with_sequencer.sequencer,
304 data_provider: self
305 .with_sequencer
306 .with_waiting_strategy
307 .with_data_provider
308 .data_provider
309 .clone(),
310 event_handlers: Vec::new(),
311 _element: Default::default(),
312 };
313
314 f(&mut scope);
315 self.with_sequencer.sequencer = scope.sequencer;
316 self.event_handlers.append(&mut scope.event_handlers);
317 self.gating_sequences = scope.cursors;
318
319 self
320 }
321
322 pub fn build(
323 self,
324 ) -> (
325 impl EventProcessorExecutor<'a>,
326 impl EventProducer<'a, Item = T>,
327 ) {
328 self.build_with_executor::<ThreadedExecutor<'a>>()
329 }
330
331 pub fn build_with_executor<E: EventProcessorExecutor<'a>>(
332 mut self,
333 ) -> (E, impl EventProducer<'a, Item = T>) {
334 for gs in &self.gating_sequences {
335 self.with_sequencer.sequencer.add_gating_sequence(gs);
336 }
337 let executor = E::with_runnables(self.event_handlers);
338 let producer = Producer::new(
339 self.with_sequencer
340 .with_waiting_strategy
341 .with_data_provider
342 .data_provider
343 .clone(),
344 self.with_sequencer.sequencer,
345 );
346 (executor, producer)
347 }
348}