Skip to main content

kovan_queue/
disruptor.rs

1use crate::utils::CacheAligned;
2use kovan::Atom;
3use std::cell::UnsafeCell;
4use std::marker::PhantomData as marker;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
7use std::thread;
8
9/// A sequence number in the Disruptor.
10///
11/// Padded to prevent false sharing.
12#[derive(Debug)]
13pub struct Sequence {
14    value: CacheAligned<AtomicI64>,
15}
16
17impl Sequence {
18    /// Creates a new sequence with the given initial value.
19    pub fn new(initial: i64) -> Self {
20        Sequence {
21            value: CacheAligned::new(AtomicI64::new(initial)),
22        }
23    }
24
25    /// Gets the current value of the sequence.
26    pub fn get(&self) -> i64 {
27        self.value.load(Ordering::Acquire)
28    }
29
30    /// Sets the value of the sequence.
31    pub fn set(&self, value: i64) {
32        self.value.store(value, Ordering::Release);
33    }
34
35    /// Atomically compares and sets the value of the sequence.
36    ///
37    /// Returns `true` if the swap occurred.
38    pub fn compare_and_set(&self, current: i64, new: i64) -> bool {
39        self.value
40            .compare_exchange(current, new, Ordering::SeqCst, Ordering::Relaxed)
41            .is_ok()
42    }
43}
44
45/// Sequencer trait for claiming and publishing slots.
46pub trait Sequencer: Send + Sync {
47    /// Claims the next sequence.
48    fn next(&self) -> i64;
49
50    /// Publishes the given sequence.
51    fn publish(&self, sequence: i64);
52
53    /// Gets the current cursor value (highest published sequence).
54    fn get_cursor(&self) -> i64;
55
56    /// Adds gating sequences.
57    fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>);
58
59    /// Gets the highest published sequence that is safe to read.
60    fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64;
61}
62
63/// Single Producer Sequencer.
64///
65/// Optimized for single-threaded publishing.
66pub struct SingleProducerSequencer {
67    /// The cursor for the sequencer (highest published sequence).
68    cursor: Arc<Sequence>,
69    /// The next sequence to be claimed.
70    next_sequence: Sequence,
71    /// Sequences to gate on (prevent wrapping).
72    gating_sequences: Atom<Vec<Arc<Sequence>>>,
73    /// Size of the buffer.
74    buffer_size: usize,
75    /// Strategy for waiting.
76    wait_strategy: Arc<dyn WaitStrategy>,
77}
78
79// SAFETY: SingleProducerSequencer is Send + Sync derived from its fields:
80// Atom<Vec<Arc<Sequence>>> is Send+Sync, Arc<Sequence> is Send+Sync,
81// AtomicI64/usize are Send+Sync, Arc<dyn WaitStrategy> is Send+Sync.
82// All fields' auto-trait impls make these bounds satisfied without unsafe overrides.
83
84impl SingleProducerSequencer {
85    pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
86        Self {
87            cursor: Arc::new(Sequence::new(-1)),
88            next_sequence: Sequence::new(-1),
89            gating_sequences: Atom::new(Vec::new()),
90            buffer_size,
91            wait_strategy,
92        }
93    }
94}
95
96impl Sequencer for SingleProducerSequencer {
97    fn next(&self) -> i64 {
98        let next = self.next_sequence.get() + 1;
99        self.next_sequence.set(next);
100
101        let wrap_point = next - self.buffer_size as i64;
102        // Load the gating-sequence list once; the AtomGuard keeps the
103        // snapshot alive (epoch-protected) for the duration of this call.
104        let gating_guard = self.gating_sequences.load();
105        let gating_sequences: &[Arc<Sequence>] = gating_guard.as_slice();
106
107        let min_seq =
108            |seqs: &[Arc<Sequence>]| seqs.iter().map(|s| s.get()).min().unwrap_or(i64::MAX);
109
110        let mut min_gating_sequence = min_seq(gating_sequences);
111
112        while wrap_point > min_gating_sequence {
113            thread::yield_now();
114            min_gating_sequence = min_seq(gating_sequences);
115        }
116
117        next
118    }
119
120    fn publish(&self, sequence: i64) {
121        self.cursor.set(sequence);
122        self.wait_strategy.signal_all_when_blocking();
123    }
124
125    fn get_cursor(&self) -> i64 {
126        self.cursor.get()
127    }
128
129    fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
130        // rcu: load current list, append, store. Retries automatically on CAS
131        // failure from concurrent callers. The closure may run more than once.
132        self.gating_sequences.rcu(|current| {
133            let mut new_list = current.clone();
134            new_list.extend(sequences.iter().cloned());
135            new_list
136        });
137    }
138
139    fn get_highest_published_sequence(&self, _next_sequence: i64, available_sequence: i64) -> i64 {
140        available_sequence
141    }
142}
143
144/// Multi Producer Sequencer.
145///
146/// Thread-safe for multiple producers.
147pub struct MultiProducerSequencer {
148    /// Sequences to gate on.
149    gating_sequences: Atom<Vec<Arc<Sequence>>>,
150    /// Size of the buffer.
151    buffer_size: usize,
152    /// Strategy for waiting.
153    wait_strategy: Arc<dyn WaitStrategy>,
154    /// The sequence used for claiming slots.
155    claim_sequence: AtomicI64,
156    /// Buffer tracking published slots for availability.
157    available_buffer: Box<[AtomicI64]>,
158    /// Mask for fast modulo operations.
159    mask: usize,
160}
161
162// SAFETY: MultiProducerSequencer is Send + Sync derived from its fields:
163// Atom<Vec<Arc<Sequence>>> is Send+Sync, AtomicI64 is Send+Sync,
164// Box<[AtomicI64]> is Send+Sync, Arc<dyn WaitStrategy> is Send+Sync.
165
166impl MultiProducerSequencer {
167    pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
168        let mut available_buffer = Vec::with_capacity(buffer_size);
169        for _ in 0..buffer_size {
170            available_buffer.push(AtomicI64::new(-1));
171        }
172
173        Self {
174            gating_sequences: Atom::new(Vec::new()),
175            buffer_size,
176            wait_strategy,
177            claim_sequence: AtomicI64::new(-1),
178            available_buffer: available_buffer.into_boxed_slice(),
179            mask: buffer_size - 1,
180        }
181    }
182}
183
184impl Sequencer for MultiProducerSequencer {
185    fn next(&self) -> i64 {
186        let current = self.claim_sequence.fetch_add(1, Ordering::SeqCst);
187        let next = current + 1;
188
189        let wrap_point = next - self.buffer_size as i64;
190        let gating_guard = self.gating_sequences.load();
191        let gating_sequences: &[Arc<Sequence>] = gating_guard.as_slice();
192
193        let min_seq =
194            |seqs: &[Arc<Sequence>]| seqs.iter().map(|s| s.get()).min().unwrap_or(i64::MAX);
195
196        let mut min_gating_sequence = min_seq(gating_sequences);
197
198        while wrap_point > min_gating_sequence {
199            thread::yield_now();
200            min_gating_sequence = min_seq(gating_sequences);
201        }
202
203        next
204    }
205
206    fn publish(&self, sequence: i64) {
207        let index = (sequence as usize) & self.mask;
208        self.available_buffer[index].store(sequence, Ordering::Release);
209        self.wait_strategy.signal_all_when_blocking();
210    }
211
212    fn get_cursor(&self) -> i64 {
213        self.claim_sequence.load(Ordering::Relaxed)
214    }
215
216    fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
217        self.gating_sequences.rcu(|current| {
218            let mut new_list = current.clone();
219            new_list.extend(sequences.iter().cloned());
220            new_list
221        });
222    }
223
224    fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64 {
225        // Check if all sequences up to the requested one are published.
226        // In MP, we must ensure contiguous availability.
227
228        let mut sequence = next_sequence;
229        while sequence <= available_sequence {
230            if !self.is_published(sequence) {
231                return sequence - 1;
232            }
233            sequence += 1;
234        }
235        available_sequence
236    }
237}
238
239impl MultiProducerSequencer {
240    fn is_published(&self, sequence: i64) -> bool {
241        let index = (sequence as usize) & self.mask;
242        self.available_buffer[index].load(Ordering::Acquire) == sequence
243    }
244}
245
246/// Strategy for waiting for a sequence to be available.
247pub trait WaitStrategy: Send + Sync {
248    /// Waits for the given sequence to be available.
249    fn wait_for(
250        &self,
251        sequence: i64,
252        cursor: &Arc<dyn Sequencer>,
253        dependent: &Arc<dyn Sequencer>,
254        barrier: &ProcessingSequenceBarrier,
255    ) -> Result<i64, AlertException>;
256
257    /// Signals all waiting threads that the cursor has advanced.
258    fn signal_all_when_blocking(&self);
259}
260
261#[derive(Debug, Clone, Copy)]
262pub struct AlertException;
263
264/// Busy Spin Wait Strategy.
265///
266/// Low latency but high CPU usage.
267pub struct BusySpinWaitStrategy;
268
269impl WaitStrategy for BusySpinWaitStrategy {
270    fn wait_for(
271        &self,
272        sequence: i64,
273        _cursor: &Arc<dyn Sequencer>,
274        dependent: &Arc<dyn Sequencer>,
275        barrier: &ProcessingSequenceBarrier,
276    ) -> Result<i64, AlertException> {
277        let mut available_sequence;
278        loop {
279            if barrier.is_alerted() {
280                return Err(AlertException);
281            }
282            available_sequence = dependent.get_cursor();
283            if available_sequence >= sequence {
284                return Ok(available_sequence);
285            }
286            std::hint::spin_loop();
287        }
288    }
289
290    fn signal_all_when_blocking(&self) {}
291}
292
293/// Yielding Wait Strategy.
294///
295/// Compromise between latency and CPU usage.
296pub struct YieldingWaitStrategy;
297
298impl WaitStrategy for YieldingWaitStrategy {
299    fn wait_for(
300        &self,
301        sequence: i64,
302        _cursor: &Arc<dyn Sequencer>,
303        dependent: &Arc<dyn Sequencer>,
304        barrier: &ProcessingSequenceBarrier,
305    ) -> Result<i64, AlertException> {
306        let mut counter = 100;
307        let mut available_sequence;
308        loop {
309            if barrier.is_alerted() {
310                return Err(AlertException);
311            }
312            available_sequence = dependent.get_cursor();
313            if available_sequence >= sequence {
314                return Ok(available_sequence);
315            }
316
317            counter -= 1;
318            if counter == 0 {
319                thread::yield_now();
320                counter = 100;
321            } else {
322                std::hint::spin_loop();
323            }
324        }
325    }
326
327    fn signal_all_when_blocking(&self) {}
328}
329
330/// Blocking Wait Strategy.
331///
332/// Uses a lock and condition variable. Lowest CPU usage.
333pub struct BlockingWaitStrategy {
334    mutex: std::sync::Mutex<()>,
335    condvar: std::sync::Condvar,
336}
337
338impl Default for BlockingWaitStrategy {
339    fn default() -> Self {
340        Self::new()
341    }
342}
343
344impl BlockingWaitStrategy {
345    /// Creates a new blocking wait strategy.
346    pub fn new() -> Self {
347        Self {
348            mutex: std::sync::Mutex::new(()),
349            condvar: std::sync::Condvar::new(),
350        }
351    }
352}
353
354impl WaitStrategy for BlockingWaitStrategy {
355    fn wait_for(
356        &self,
357        sequence: i64,
358        _cursor: &Arc<dyn Sequencer>,
359        dependent: &Arc<dyn Sequencer>,
360        barrier: &ProcessingSequenceBarrier,
361    ) -> Result<i64, AlertException> {
362        let mut available_sequence = dependent.get_cursor();
363        if available_sequence < sequence {
364            let mut guard = self.mutex.lock().unwrap();
365            while dependent.get_cursor() < sequence {
366                if barrier.is_alerted() {
367                    return Err(AlertException);
368                }
369                guard = self.condvar.wait(guard).unwrap();
370                // Re-acquire guard is automatic
371                // Loop continues to check condition
372            }
373            available_sequence = dependent.get_cursor();
374        }
375
376        while available_sequence < sequence {
377            if barrier.is_alerted() {
378                return Err(AlertException);
379            }
380            available_sequence = dependent.get_cursor();
381            // Busy spin fallback or check again
382            thread::yield_now();
383        }
384
385        Ok(available_sequence)
386    }
387
388    fn signal_all_when_blocking(&self) {
389        let _guard = self.mutex.lock().unwrap();
390        self.condvar.notify_all();
391    }
392}
393
394/// Coordination barrier for tracking dependencies.
395pub struct ProcessingSequenceBarrier {
396    /// Strategy for waiting.
397    wait_strategy: Arc<dyn WaitStrategy>,
398    /// The sequencer to wait on (dependent).
399    dependent_sequencer: Arc<dyn Sequencer>,
400    /// The sequencer of the ring buffer (cursor).
401    cursor_sequencer: Arc<dyn Sequencer>,
402    /// Whether the barrier has been alerted.
403    alerted: AtomicBool,
404}
405
406impl ProcessingSequenceBarrier {
407    /// Creates a new processing sequence barrier.
408    pub fn new(
409        wait_strategy: Arc<dyn WaitStrategy>,
410        dependent_sequencer: Arc<dyn Sequencer>,
411        cursor_sequencer: Arc<dyn Sequencer>,
412    ) -> Self {
413        Self {
414            wait_strategy,
415            dependent_sequencer,
416            cursor_sequencer,
417            alerted: AtomicBool::new(false),
418        }
419    }
420
421    /// Waits for the given sequence to be available.
422    pub fn wait_for(&self, sequence: i64) -> Result<i64, AlertException> {
423        let available = self.wait_strategy.wait_for(
424            sequence,
425            &self.cursor_sequencer,
426            &self.dependent_sequencer,
427            self,
428        )?;
429
430        // Ensure the sequence is fully published (crucial for MP).
431        Ok(self
432            .cursor_sequencer
433            .get_highest_published_sequence(sequence, available))
434    }
435
436    /// Returns true if the barrier has been alerted.
437    pub fn is_alerted(&self) -> bool {
438        self.alerted.load(Ordering::Acquire)
439    }
440
441    /// Alerts the barrier, causing waiters to wake up.
442    pub fn alert(&self) {
443        self.alerted.store(true, Ordering::Release);
444        self.wait_strategy.signal_all_when_blocking();
445    }
446
447    /// Clears the alert status.
448    pub fn clear_alert(&self) {
449        self.alerted.store(false, Ordering::Release);
450    }
451}
452
453/// Event Handler trait.
454pub trait EventHandler<T>: Send + Sync {
455    /// Called when an event is available for processing.
456    fn on_event(&self, event: &T, sequence: u64, end_of_batch: bool);
457}
458
459/// Ring Buffer.
460pub struct RingBuffer<T> {
461    /// The buffer of events.
462    buffer: Box<[UnsafeCell<T>]>,
463    /// Mask for fast modulo operations.
464    mask: usize,
465    /// The sequencer managing this buffer.
466    sequencer: Arc<dyn Sequencer>,
467}
468
469unsafe impl<T: Send> Send for RingBuffer<T> {}
470// SAFETY: Multiple consumers hold `&RingBuffer<T>` concurrently and can read `&T`
471// references to the same slot, so `T` must be `Sync` in addition to `Send`.
472unsafe impl<T: Send + Sync> Sync for RingBuffer<T> {}
473
474impl<T> RingBuffer<T> {
475    /// Creates a new ring buffer with the given factory, size, and sequencer.
476    pub fn new<F>(factory: F, size: usize, sequencer: Arc<dyn Sequencer>) -> Self
477    where
478        F: Fn() -> T,
479    {
480        let capacity = size.next_power_of_two();
481        let mut buffer = Vec::with_capacity(capacity);
482        for _ in 0..capacity {
483            buffer.push(UnsafeCell::new(factory()));
484        }
485
486        Self {
487            buffer: buffer.into_boxed_slice(),
488            mask: capacity - 1,
489            sequencer,
490        }
491    }
492
493    /// Adds gating sequences to the sequencer.
494    pub fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
495        self.sequencer.add_gating_sequences(sequences);
496    }
497
498    /// Gets a reference to the event at the given sequence.
499    ///
500    /// # Safety
501    ///
502    /// The caller must guarantee that `sequence` has been fully published by the
503    /// producer (i.e., `publish(sequence)` was called and observed) **and** that
504    /// no other thread is concurrently writing to the same slot.  Within the
505    /// Disruptor protocol this is established by waiting on a
506    /// [`ProcessingSequenceBarrier`] before calling this method.  Calling it with
507    /// an unpublished sequence or while a producer holds the slot is undefined
508    /// behaviour.
509    pub unsafe fn get(&self, sequence: i64) -> &T {
510        unsafe { &*self.buffer[(sequence as usize) & self.mask].get() }
511    }
512
513    /// Gets a mutable reference to the event at the given sequence.
514    pub fn get_mut(&mut self, sequence: i64) -> &mut T {
515        unsafe { &mut *self.buffer[(sequence as usize) & self.mask].get() }
516    }
517
518    /// Unsafe access for high performance (internal use).
519    ///
520    /// # Safety
521    ///
522    /// The caller must ensure that the sequence number is valid and that
523    /// no other thread is concurrently modifying the same slot.
524    pub unsafe fn get_unchecked(&self, sequence: i64) -> &T {
525        unsafe {
526            &*self
527                .buffer
528                .get_unchecked((sequence as usize) & self.mask)
529                .get()
530        }
531    }
532
533    /// Unsafe mutable access for high performance (internal use).
534    ///
535    /// # Safety
536    ///
537    /// The caller must ensure that the sequence number is valid and that
538    /// they have exclusive access to the slot (e.g., via the Disruptor protocol).
539    #[allow(clippy::mut_from_ref)]
540    pub unsafe fn get_unchecked_mut(&self, sequence: i64) -> &mut T {
541        unsafe {
542            &mut *self
543                .buffer
544                .get_unchecked((sequence as usize) & self.mask)
545                .get()
546        }
547    }
548
549    /// Claims the next sequence in the ring buffer.
550    pub fn next(&self) -> i64 {
551        self.sequencer.next()
552    }
553
554    /// Publishes the sequence, making it available to consumers.
555    pub fn publish(&self, sequence: i64) {
556        self.sequencer.publish(sequence);
557    }
558}
559
560/// Producer handle.
561///
562/// When dropped, alerts all consumer barriers and joins the consumer threads,
563/// ensuring a clean shutdown of the disruptor pipeline.
564pub struct Producer<T> {
565    ring_buffer: Arc<RingBuffer<T>>,
566    /// Barriers for each consumer — used to alert them on shutdown.
567    barriers: Vec<Arc<ProcessingSequenceBarrier>>,
568    /// Join handles for consumer threads, taken during drop.
569    join_handles: Vec<Option<thread::JoinHandle<()>>>,
570}
571
572impl<T> Producer<T> {
573    /// Publishes an event to the ring buffer.
574    pub fn publish<F>(&mut self, update: F)
575    where
576        F: FnOnce(&mut T),
577    {
578        let sequence = self.ring_buffer.next();
579        // SAFETY: We have claimed the sequence, so we have exclusive access to this slot.
580        let event = unsafe { self.ring_buffer.get_unchecked_mut(sequence) };
581        update(event);
582        self.ring_buffer.publish(sequence);
583    }
584}
585
586impl<T> Drop for Producer<T> {
587    fn drop(&mut self) {
588        // Alert all consumer barriers so they break out of their wait loops.
589        for barrier in &self.barriers {
590            barrier.alert();
591        }
592        // Join all consumer threads to ensure clean shutdown.
593        for handle in &mut self.join_handles {
594            if let Some(h) = handle.take() {
595                let _ = h.join();
596            }
597        }
598    }
599}
600
601/// Batch Event Processor.
602pub struct BatchEventProcessor<T> {
603    /// The ring buffer to read from.
604    ring_buffer: Arc<RingBuffer<T>>,
605    /// The sequence of this processor.
606    sequence: Arc<Sequence>,
607    /// The barrier to wait on.
608    barrier: Arc<ProcessingSequenceBarrier>,
609    /// The handler to process events.
610    handler: Arc<dyn EventHandler<T>>,
611}
612
613impl<T> BatchEventProcessor<T> {
614    /// Creates a new batch event processor.
615    pub fn new(
616        ring_buffer: Arc<RingBuffer<T>>,
617        barrier: Arc<ProcessingSequenceBarrier>,
618        handler: Arc<dyn EventHandler<T>>,
619    ) -> Self {
620        Self {
621            ring_buffer,
622            sequence: Arc::new(Sequence::new(-1)),
623            barrier,
624            handler,
625        }
626    }
627
628    /// Gets the sequence of the processor.
629    pub fn get_sequence(&self) -> Arc<Sequence> {
630        self.sequence.clone()
631    }
632
633    /// Runs the processor loop.
634    pub fn run(&self) {
635        let mut next_sequence = self.sequence.get() + 1;
636        loop {
637            match self.barrier.wait_for(next_sequence) {
638                Ok(available_sequence) => {
639                    while next_sequence <= available_sequence {
640                        // SAFETY: The barrier guarantees that the sequence is available for reading.
641                        let event = unsafe { self.ring_buffer.get_unchecked(next_sequence) };
642                        self.handler.on_event(
643                            event,
644                            next_sequence as u64,
645                            next_sequence == available_sequence,
646                        );
647                        next_sequence += 1;
648                    }
649                    self.sequence.set(available_sequence);
650                }
651                Err(_) => {
652                    if self.barrier.is_alerted() {
653                        break;
654                    }
655                }
656            }
657        }
658    }
659}
660
661/// Disruptor Facade.
662pub struct Disruptor<T> {
663    /// The ring buffer.
664    ring_buffer: Arc<RingBuffer<T>>,
665    /// The registered event processors.
666    processors: Vec<Arc<BatchEventProcessor<T>>>,
667    /// Whether the disruptor has been started.
668    started: bool,
669    /// The wait strategy used.
670    wait_strategy: Arc<dyn WaitStrategy>,
671}
672
673pub enum ProducerType {
674    Single,
675    Multi,
676}
677
678pub struct DisruptorBuilder<T, F> {
679    /// Factory for creating events.
680    factory: F,
681    /// Size of the ring buffer.
682    buffer_size: usize,
683    /// Wait strategy to use.
684    wait_strategy: Arc<dyn WaitStrategy>,
685    /// Type of producer (Single or Multi).
686    producer_type: ProducerType,
687    marker: marker<T>,
688}
689
690impl<T, F> DisruptorBuilder<T, F>
691where
692    F: Fn() -> T,
693{
694    /// Creates a new builder with the given factory.
695    pub fn new(factory: F) -> Self {
696        Self {
697            factory,
698            buffer_size: 1024,
699            wait_strategy: Arc::new(BusySpinWaitStrategy),
700            producer_type: ProducerType::Single,
701            marker: marker::<T>,
702        }
703    }
704
705    /// Sets the buffer size.
706    pub fn buffer_size(mut self, size: usize) -> Self {
707        self.buffer_size = size;
708        self
709    }
710
711    /// Sets the wait strategy.
712    pub fn wait_strategy<W: WaitStrategy + 'static>(mut self, strategy: W) -> Self {
713        self.wait_strategy = Arc::new(strategy);
714        self
715    }
716
717    /// Sets the producer type to Single.
718    pub fn single_producer(mut self) -> Self {
719        self.producer_type = ProducerType::Single;
720        self
721    }
722
723    /// Sets the producer type to Multi.
724    pub fn multi_producer(mut self) -> Self {
725        self.producer_type = ProducerType::Multi;
726        self
727    }
728
729    /// Builds the Disruptor.
730    pub fn build(self) -> Disruptor<T> {
731        let sequencer: Arc<dyn Sequencer> = match self.producer_type {
732            ProducerType::Single => Arc::new(SingleProducerSequencer::new(
733                self.buffer_size,
734                self.wait_strategy.clone(),
735            )),
736            ProducerType::Multi => Arc::new(MultiProducerSequencer::new(
737                self.buffer_size,
738                self.wait_strategy.clone(),
739            )),
740        };
741
742        let ring_buffer = Arc::new(RingBuffer::new(self.factory, self.buffer_size, sequencer));
743        Disruptor {
744            ring_buffer,
745            processors: Vec::new(),
746            started: false,
747            wait_strategy: self.wait_strategy,
748        }
749    }
750}
751
752impl<T: Send + Sync + 'static> Disruptor<T> {
753    /// Creates a new builder for the Disruptor.
754    pub fn builder<F>(factory: F) -> DisruptorBuilder<T, F>
755    where
756        F: Fn() -> T,
757    {
758        DisruptorBuilder::new(factory)
759    }
760
761    /// Registers an event handler.
762    pub fn handle_events_with<H: EventHandler<T> + 'static>(&mut self, handler: H) -> &mut Self {
763        // Create a barrier that waits on the ring buffer's sequencer.
764        // Initially, the barrier depends on the sequencer itself (producers).
765
766        let barrier = Arc::new(ProcessingSequenceBarrier::new(
767            self.wait_strategy.clone(),
768            self.ring_buffer.sequencer.clone(), // Dependent on cursor (producer)
769            self.ring_buffer.sequencer.clone(),
770        ));
771
772        let processor = Arc::new(BatchEventProcessor::new(
773            self.ring_buffer.clone(),
774            barrier,
775            Arc::new(handler),
776        ));
777
778        self.processors.push(processor);
779        self
780    }
781
782    /// Starts the Disruptor and returns a Producer.
783    ///
784    /// Consumer threads are spawned for each registered handler. When the
785    /// returned `Producer` is dropped, it alerts all consumer barriers and
786    /// joins the threads, ensuring a clean shutdown.
787    pub fn start(mut self) -> Producer<T> {
788        let mut gating_sequences = Vec::new();
789        let mut barriers = Vec::new();
790        let mut join_handles = Vec::new();
791
792        for processor in &self.processors {
793            gating_sequences.push(processor.get_sequence());
794            barriers.push(processor.barrier.clone());
795            let p = processor.clone();
796            join_handles.push(Some(thread::spawn(move || {
797                p.run();
798            })));
799        }
800
801        // RingBuffer::add_gating_sequences uses interior mutability in the Sequencer.
802        self.ring_buffer.add_gating_sequences(gating_sequences);
803
804        self.started = true;
805
806        Producer {
807            ring_buffer: self.ring_buffer,
808            barriers,
809            join_handles,
810        }
811    }
812}