Skip to main content

disruptor_mp/
consumer.rs

1//! Multi-process consumer implementation.
2//!
3//! This module provides the [`SharedConsumer`] type for consuming events from a shared memory
4//! ring buffer created by a producer in another process. Each consumer maintains its own
5//! sequence tracking and supports both manual and automatic event processing modes.
6
7use crate::{SharedCursor, SharedRingBuffer};
8use disruptor_core::Sequence;
9use std::ops::Deref;
10use std::sync::atomic::Ordering;
11
12/// Consumer for multi-process disruptor with broadcast semantics
13/// Each consumer maintains its own sequence and sees all events
14pub struct SharedConsumer<E> {
15    ring_buffer: SharedRingBuffer<E>,
16    producer_sequence: SharedCursor,
17    /// This consumer's own sequence (last event it processed)
18    consumer_sequence: SharedCursor,
19    /// Consumer ID for this instance
20    consumer_id: String,
21    /// Last sequence processed by THIS consumer
22    last_processed_sequence: Sequence,
23    /// Consumer readiness counter for internal coordination (optional)
24    consumers_ready: Option<SharedCursor>,
25    /// Aeron-style hot-path counters (RFC 0040). All-`None` while no
26    /// counters file is attached; `attach_counters` populates them.
27    counters: ConsumerCounters,
28}
29
30/// Consumer-side observability handles. Each field is a relaxed-atomic
31/// counter sitting in a shared `CountersFile`. RFC 0040 §Counters defines
32/// the canonical IDs.
33#[derive(Default, Debug)]
34pub struct ConsumerCounters {
35    /// `events_consumed` — successful `try_consume_next` (and friends).
36    pub events_consumed: Option<crate::observability::CounterHandle>,
37    /// `consumer_empty_spins` — `try_consume_next` saw ring empty.
38    pub consumer_empty_spins: Option<crate::observability::CounterHandle>,
39    /// `consumer_lag_max` — high-water mark of `producer_seq − consumer_seq`.
40    pub consumer_lag_max: Option<crate::observability::CounterHandle>,
41}
42
43/// Select which consumer-side counters are attached to a shared counters file.
44///
45/// The full set is appropriate for normal observability. Targeted microbenchmarks can
46/// deliberately narrow the set to reduce hot-path perturbation while still exposing the
47/// counters required for a specific experiment.
48#[derive(Clone, Copy, Debug)]
49pub struct ConsumerCounterSelection {
50    /// Attach the monotonic `events_consumed` counter.
51    pub events_consumed: bool,
52    /// Attach the `consumer_empty_spins` counter.
53    pub consumer_empty_spins: bool,
54    /// Attach the `consumer_lag_max` high-water-mark counter.
55    pub consumer_lag_max: bool,
56}
57
58impl ConsumerCounterSelection {
59    /// Full RFC-0040 consumer counter set.
60    pub const FULL: Self = Self {
61        events_consumed: true,
62        consumer_empty_spins: true,
63        consumer_lag_max: true,
64    };
65
66    /// Minimal consumer counter set for experiments that only need consumption progress.
67    pub const LITE: Self = Self {
68        events_consumed: true,
69        consumer_empty_spins: false,
70        consumer_lag_max: false,
71    };
72}
73
74pub struct SharedConsumerLease<'a, E>
75where
76    E: Copy + Default,
77{
78    consumer: &'a mut SharedConsumer<E>,
79    sequence: Sequence,
80    event_ptr: *const E,
81}
82
83impl<E> SharedConsumerLease<'_, E>
84where
85    E: Copy + Default,
86{
87    pub fn sequence(&self) -> Sequence {
88        self.sequence
89    }
90}
91
92impl<E> Deref for SharedConsumerLease<'_, E>
93where
94    E: Copy + Default,
95{
96    type Target = E;
97
98    fn deref(&self) -> &Self::Target {
99        // Safety: the lease keeps the consumer sequence unpublished until drop,
100        // so the producer cannot reuse the slot backing `event_ptr`.
101        unsafe { &*self.event_ptr }
102    }
103}
104
105impl<E> Drop for SharedConsumerLease<'_, E>
106where
107    E: Copy + Default,
108{
109    fn drop(&mut self) {
110        self.consumer.publish_consumed_sequence(self.sequence);
111        if let Some(h) = &self.consumer.counters.events_consumed {
112            h.inc();
113        }
114    }
115}
116
117impl<E> SharedConsumer<E>
118where
119    E: Copy + Default,
120{
121    pub(crate) fn new_with_coordination(
122        ring_buffer: SharedRingBuffer<E>,
123        producer_sequence: SharedCursor,
124        consumer_sequence: SharedCursor,
125        consumer_id: String,
126        base_name: Option<String>,
127    ) -> Self {
128        assert!(!consumer_id.is_empty(), "consumer_id must not be empty");
129
130        // Try to attach to coordination structure if base_name is provided
131        let consumers_ready = base_name.as_ref().and_then(|name| {
132            // Use shorter name for macOS compatibility (error 63 = name too long)
133            let coordination_name = format!("{}_cr", name);
134            SharedCursor::attach(&coordination_name).ok()
135        });
136
137        let mut consumer = Self {
138            ring_buffer,
139            producer_sequence,
140            consumer_sequence,
141            consumer_id,
142            last_processed_sequence: -1,
143            consumers_ready,
144            counters: ConsumerCounters::default(),
145        };
146
147        consumer.last_processed_sequence = consumer.consumer_sequence.load(Ordering::Acquire);
148
149        // Automatically signal readiness if coordination is available
150        consumer.signal_readiness();
151
152        consumer
153    }
154
155    /// Signal consumer readiness (internal coordination)
156    /// This is called automatically when the consumer is created
157    pub fn signal_readiness(&self) {
158        if let Some(consumers_ready) = &self.consumers_ready {
159            // AcqRel preserves readiness-count monotonicity across processes.
160            consumers_ready.fetch_add(1, Ordering::AcqRel);
161        }
162    }
163
164    /// Try to attach to coordination structure (retry mechanism for timing issues)
165    pub fn try_attach_coordination(&mut self, base_name: &str) -> bool {
166        assert!(!base_name.is_empty(), "base_name must not be empty");
167
168        if self.consumers_ready.is_some() {
169            return true; // Already attached
170        }
171
172        // Use shorter name for macOS compatibility (error 63 = name too long)
173        let coordination_name = format!("{}_cr", base_name);
174        if let Ok(cursor) = SharedCursor::attach(&coordination_name) {
175            self.consumers_ready = Some(cursor);
176            self.signal_readiness(); // Signal readiness now that we're attached
177            return true;
178        }
179        false
180    }
181
182    /// Check if this consumer has coordination support
183    pub fn has_coordination_support(&self) -> bool {
184        self.consumers_ready.is_some()
185    }
186
187    /// Register consumer counters in the supplied counters file and
188    /// store handles on this consumer. After this call, hot-path
189    /// `try_consume_next` operations record into the file with one
190    /// relaxed atomic increment per event. RFC 0040 §Counters.
191    pub fn attach_counters(&mut self, file: &crate::observability::CountersFile) {
192        self.attach_counters_selected(file, ConsumerCounterSelection::FULL);
193    }
194
195    /// Register a selected subset of consumer counters in the supplied counters file.
196    pub fn attach_counters_selected(
197        &mut self,
198        file: &crate::observability::CountersFile,
199        selection: ConsumerCounterSelection,
200    ) {
201        use crate::observability::{ids, COUNTER_FLAG_CONSUMER};
202        self.counters.events_consumed = if selection.events_consumed {
203            file.register(
204                ids::EVENTS_CONSUMED,
205                COUNTER_FLAG_CONSUMER,
206                "events_consumed",
207            )
208        } else {
209            None
210        };
211        self.counters.consumer_empty_spins = if selection.consumer_empty_spins {
212            file.register(
213                ids::CONSUMER_EMPTY_SPINS,
214                COUNTER_FLAG_CONSUMER,
215                "consumer_empty_spins",
216            )
217        } else {
218            None
219        };
220        self.counters.consumer_lag_max = if selection.consumer_lag_max {
221            file.register(
222                ids::CONSUMER_LAG_MAX,
223                COUNTER_FLAG_CONSUMER,
224                "consumer_lag_max",
225            )
226        } else {
227            None
228        };
229    }
230
231    /// Read-only access to the consumer's attached counters.
232    pub fn counters(&self) -> &ConsumerCounters {
233        &self.counters
234    }
235
236    /// Record a consume-side latency sample (nanoseconds) through the
237    /// `metrics`-rs facade under the histogram name
238    /// `disruptor_mp_consume_latency_ns`. The downstream recorder
239    /// (Prometheus / OTLP / Debugging) decides how to aggregate the
240    /// distribution.
241    ///
242    /// Cost: one `metrics::histogram!` call (per-process recorder
243    /// dispatch). When no recorder is installed the call is a no-op.
244    /// When the `metrics` feature is off this method compiles to a
245    /// no-op; it stays in the API so call-sites don't need to be
246    /// `cfg`-gated.
247    #[inline]
248    pub fn record_consume_latency_ns(&self, ns: u64) {
249        #[cfg(feature = "metrics")]
250        metrics::histogram!("disruptor_mp_consume_latency_ns").record(ns as f64);
251        #[cfg(not(feature = "metrics"))]
252        let _ = ns;
253    }
254
255    /// Try to consume the next available event for this consumer
256    /// Returns None if no new events are available
257    pub fn try_consume_next(&mut self) -> Option<(Sequence, E)> {
258        // Acquire load enforces visibility for producer progress before deciding
259        // whether the next slot is safe to consume.
260        let Some((next_sequence, upper)) = self.available_batch_bounds() else {
261            if let Some(h) = &self.counters.consumer_empty_spins {
262                h.inc();
263            }
264            return None;
265        };
266
267        let event_ptr = self.ring_buffer.get(next_sequence);
268        let event = unsafe { *event_ptr }; // Copy the event
269
270        self.publish_consumed_sequence(next_sequence);
271        if let Some(h) = &self.counters.events_consumed {
272            h.inc();
273        }
274        if let Some(h) = &self.counters.consumer_lag_max {
275            // upper - next_sequence is current lag from this consumer's view.
276            let lag = (upper - next_sequence).max(0) as u64;
277            h.record_max(lag);
278        }
279        Some((next_sequence, event))
280    }
281
282    /// Try to lease the next available event without copying it out of the ring slot.
283    ///
284    /// The returned lease publishes consumer progress only when dropped, which keeps
285    /// the backing slot valid for the lease lifetime.
286    pub fn try_consume_next_leased(&mut self) -> Option<SharedConsumerLease<'_, E>> {
287        let Some((next_sequence, upper)) = self.available_batch_bounds() else {
288            if let Some(h) = &self.counters.consumer_empty_spins {
289                h.inc();
290            }
291            return None;
292        };
293        if let Some(h) = &self.counters.consumer_lag_max {
294            let lag = (upper - next_sequence).max(0) as u64;
295            h.record_max(lag);
296        }
297        let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
298        // events_consumed is incremented when the lease drops (since the
299        // consume isn't durable until publish_consumed_sequence runs in
300        // SharedConsumerLease::Drop). Track via the drop side below.
301        Some(SharedConsumerLease {
302            consumer: self,
303            sequence: next_sequence,
304            event_ptr,
305        })
306    }
307
308    #[inline]
309    fn available_batch_bounds(&self) -> Option<(Sequence, Sequence)> {
310        assert!(
311            self.last_processed_sequence >= -1,
312            "consumer sequence must not be lower than -1"
313        );
314
315        let producer_seq = self.producer_sequence.load(Ordering::Acquire);
316        let next_sequence = self.last_processed_sequence + 1;
317
318        if next_sequence > producer_seq {
319            return None;
320        }
321
322        Some((next_sequence, producer_seq))
323    }
324
325    #[inline]
326    fn publish_consumed_sequence(&mut self, sequence: Sequence) {
327        // Publish consumer progress once per consumed snapshot batch. Keeping the
328        // shared cursor slightly behind while callbacks run is safe because it can
329        // only make the producer more conservative; it cannot permit overwrite of
330        // unread slots.
331        self.consumer_sequence.store(sequence, Ordering::Release);
332        self.last_processed_sequence = sequence;
333    }
334
335    #[inline]
336    fn is_end_of_batch(&self) -> bool {
337        self.last_processed_sequence >= self.producer_sequence.load(Ordering::Acquire)
338    }
339
340    #[inline]
341    fn process_snapshot_batch<F>(
342        &mut self,
343        lower: Sequence,
344        upper: Sequence,
345        processor: &mut F,
346    ) -> usize
347    where
348        F: FnMut(&E, Sequence),
349    {
350        let mut processed = 0usize;
351        if let Some(h) = &self.counters.consumer_lag_max {
352            let lag = (upper - lower).max(0) as u64;
353            h.record_max(lag);
354        }
355
356        for sequence in lower..=upper {
357            let event_ptr = self.ring_buffer.get(sequence);
358            let event = unsafe { &*event_ptr };
359            processor(event, sequence);
360            processed += 1;
361        }
362
363        self.publish_consumed_sequence(upper);
364        if let Some(h) = &self.counters.events_consumed {
365            h.add(processed as u64);
366        }
367        processed
368    }
369
370    #[inline]
371    fn process_snapshot_batch_with_eob<F>(
372        &mut self,
373        lower: Sequence,
374        upper: Sequence,
375        processor: &mut F,
376    ) -> usize
377    where
378        F: FnMut(&E, Sequence, bool),
379    {
380        let mut processed = 0usize;
381        if let Some(h) = &self.counters.consumer_lag_max {
382            let lag = (upper - lower).max(0) as u64;
383            h.record_max(lag);
384        }
385
386        for sequence in lower..=upper {
387            let event_ptr = self.ring_buffer.get(sequence);
388            let event = unsafe { &*event_ptr };
389            let end_of_batch = sequence == upper;
390            processor(event, sequence, end_of_batch);
391            processed += 1;
392        }
393
394        self.publish_consumed_sequence(upper);
395        if let Some(h) = &self.counters.events_consumed {
396            h.add(processed as u64);
397        }
398        processed
399    }
400
401    /// Wait for and consume the next event (blocking)
402    /// Returns the sequence and event data
403    pub fn consume_next(&mut self) -> (Sequence, E) {
404        loop {
405            if let Some((seq, event)) = self.try_consume_next() {
406                return (seq, event);
407            }
408            #[cfg(dst)]
409            if crate::dst::buggify(file!(), line!()) {
410                std::thread::yield_now();
411            }
412            // High performance: Use spin_loop for maximum throughput
413            // This matches the performance of manual polling approach
414            std::hint::spin_loop();
415        }
416    }
417
418    /// Wait for and lease the next event (blocking).
419    pub fn consume_next_leased(&mut self) -> SharedConsumerLease<'_, E> {
420        loop {
421            if let Some((next_sequence, _)) = self.available_batch_bounds() {
422                let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
423                return SharedConsumerLease {
424                    consumer: self,
425                    sequence: next_sequence,
426                    event_ptr,
427                };
428            }
429            std::hint::spin_loop();
430        }
431    }
432
433    /// Wait for and consume the next event (blocking with sleep for CPU efficiency)
434    /// Returns the sequence and event data
435    /// Use this when you want better CPU efficiency at the cost of throughput
436    pub fn consume_next_with_sleep(&mut self) -> (Sequence, E) {
437        loop {
438            if let Some((seq, event)) = self.try_consume_next() {
439                return (seq, event);
440            }
441            // CPU efficient: Use sleep to reduce CPU usage (lower throughput)
442            // TODO: Implement proper blocking with futex/condition variables
443            super::wait::perform_default_consume_sleep_wait();
444        }
445    }
446
447    /// Wait for and lease the next event (blocking with sleep for CPU efficiency).
448    pub fn consume_next_leased_with_sleep(&mut self) -> SharedConsumerLease<'_, E> {
449        loop {
450            if let Some((next_sequence, _)) = self.available_batch_bounds() {
451                let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
452                return SharedConsumerLease {
453                    consumer: self,
454                    sequence: next_sequence,
455                    event_ptr,
456                };
457            }
458            super::wait::perform_default_consume_sleep_wait();
459        }
460    }
461
462    /// Process next available event with blocking semantics (HIGH PERFORMANCE)
463    /// This method blocks until an event is available, then processes it
464    /// Uses `spin_loop()` for maximum throughput (CPU intensive)
465    /// Returns the sequence and whether this was the end of a batch
466    pub fn process_next_blocking<F>(&mut self, mut processor: F) -> (Sequence, bool)
467    where
468        F: FnMut(&E, Sequence, bool),
469    {
470        // Block until an event is available (high performance - uses spin_loop)
471        let (sequence, event) = self.consume_next();
472
473        // Check if more events are immediately available (end_of_batch detection)
474        let end_of_batch = self.is_end_of_batch();
475
476        // Process the event with end_of_batch information
477        processor(&event, sequence, end_of_batch);
478
479        (sequence, end_of_batch)
480    }
481
482    /// Process next available event with blocking semantics (CPU EFFICIENT)
483    /// This method blocks until an event is available, then processes it
484    /// Uses `sleep()` for better CPU efficiency (lower throughput)
485    /// Returns the sequence and whether this was the end of a batch
486    pub fn process_next_blocking_with_sleep<F>(&mut self, mut processor: F) -> (Sequence, bool)
487    where
488        F: FnMut(&E, Sequence, bool),
489    {
490        // Block until an event is available (CPU efficient - uses sleep)
491        let (sequence, event) = self.consume_next_with_sleep();
492
493        // Check if more events are immediately available (end_of_batch detection)
494        let end_of_batch = self.is_end_of_batch();
495
496        // Process the event with end_of_batch information
497        processor(&event, sequence, end_of_batch);
498
499        (sequence, end_of_batch)
500    }
501
502    /// Process available events with blocking semantics + batch processing (HIGH PERFORMANCE)
503    /// This method blocks until at least one event is available, then processes ALL available events
504    /// This matches the performance characteristics of manual polling by processing in batches
505    /// Returns the number of events processed
506    pub fn process_available_blocking<F>(&mut self, mut processor: F) -> usize
507    where
508        F: FnMut(&E, Sequence, bool),
509    {
510        let mut processed = 0usize;
511
512        loop {
513            if let Some((lower, upper)) = self.available_batch_bounds() {
514                processed += self.process_snapshot_batch_with_eob(lower, upper, &mut processor);
515                break;
516            }
517
518            // High performance blocking wait for the first batch.
519            std::hint::spin_loop();
520        }
521
522        while let Some((lower, upper)) = self.available_batch_bounds() {
523            processed += self.process_snapshot_batch_with_eob(lower, upper, &mut processor);
524        }
525
526        processed
527    }
528
529    /// Process available events with a callback function
530    /// Returns the number of events processed by this consumer
531    pub fn process_available<F>(&mut self, mut processor: F) -> usize
532    where
533        F: FnMut(&E, Sequence),
534    {
535        #[cfg(dst)]
536        if crate::dst::buggify(file!(), line!()) {
537            return 0;
538        }
539
540        let mut processed = 0usize;
541        let mut observed_batch = false;
542
543        while let Some((lower, upper)) = self.available_batch_bounds() {
544            observed_batch = true;
545            processed += self.process_snapshot_batch(lower, upper, &mut processor);
546        }
547
548        if !observed_batch {
549            if let Some(h) = &self.counters.consumer_empty_spins {
550                h.inc();
551            }
552        }
553
554        processed
555    }
556
557    /// Get the last sequence processed by this consumer
558    pub fn current_sequence(&self) -> Sequence {
559        self.last_processed_sequence
560    }
561
562    /// Get the current producer sequence (for debugging)
563    pub fn producer_sequence(&self) -> Sequence {
564        // Acquire load gives a coherent producer cursor for diagnostics.
565        self.producer_sequence.load(Ordering::Acquire)
566    }
567
568    /// Get this consumer's sequence (for debugging)
569    pub fn consumer_sequence(&self) -> Sequence {
570        // Acquire load keeps debug output in the same ordering domain as runtime reads.
571        self.consumer_sequence.load(Ordering::Acquire)
572    }
573
574    /// Get debug information about sequences
575    pub fn debug_sequences(&self) -> (Sequence, Sequence, Sequence) {
576        let producer_seq = self.producer_sequence.load(Ordering::Acquire);
577        let consumer_seq = self.consumer_sequence.load(Ordering::Acquire);
578        (self.last_processed_sequence, producer_seq, consumer_seq)
579    }
580
581    /// Get consumer ID
582    pub fn consumer_id(&self) -> &str {
583        &self.consumer_id
584    }
585}
586
587// Note: SharedConsumer doesn't need a Drop implementation because:
588// 1. The SharedRingBuffer it holds is not owned (attached, not created)
589// 2. The SharedCursors (producer_sequence, consumer_sequence, consumers_ready) have their own Drop impl
590// 3. Consumers don't create shared memory segments, they only attach to existing ones