use crate::{SharedCursor, SharedRingBuffer};
use disruptor_core::Sequence;
use std::ops::Deref;
use std::sync::atomic::Ordering;
pub struct SharedConsumer<E> {
ring_buffer: SharedRingBuffer<E>,
producer_sequence: SharedCursor,
consumer_sequence: SharedCursor,
consumer_id: String,
last_processed_sequence: Sequence,
consumers_ready: Option<SharedCursor>,
counters: ConsumerCounters,
}
#[derive(Default, Debug)]
pub struct ConsumerCounters {
pub events_consumed: Option<crate::observability::CounterHandle>,
pub consumer_empty_spins: Option<crate::observability::CounterHandle>,
pub consumer_lag_max: Option<crate::observability::CounterHandle>,
}
#[derive(Clone, Copy, Debug)]
pub struct ConsumerCounterSelection {
pub events_consumed: bool,
pub consumer_empty_spins: bool,
pub consumer_lag_max: bool,
}
impl ConsumerCounterSelection {
pub const FULL: Self = Self {
events_consumed: true,
consumer_empty_spins: true,
consumer_lag_max: true,
};
pub const LITE: Self = Self {
events_consumed: true,
consumer_empty_spins: false,
consumer_lag_max: false,
};
}
pub struct SharedConsumerLease<'a, E>
where
E: Copy + Default,
{
consumer: &'a mut SharedConsumer<E>,
sequence: Sequence,
event_ptr: *const E,
}
impl<E> SharedConsumerLease<'_, E>
where
E: Copy + Default,
{
pub fn sequence(&self) -> Sequence {
self.sequence
}
}
impl<E> Deref for SharedConsumerLease<'_, E>
where
E: Copy + Default,
{
type Target = E;
fn deref(&self) -> &Self::Target {
unsafe { &*self.event_ptr }
}
}
impl<E> Drop for SharedConsumerLease<'_, E>
where
E: Copy + Default,
{
fn drop(&mut self) {
self.consumer.publish_consumed_sequence(self.sequence);
if let Some(h) = &self.consumer.counters.events_consumed {
h.inc();
}
}
}
impl<E> SharedConsumer<E>
where
E: Copy + Default,
{
pub(crate) fn new_with_coordination(
ring_buffer: SharedRingBuffer<E>,
producer_sequence: SharedCursor,
consumer_sequence: SharedCursor,
consumer_id: String,
base_name: Option<String>,
) -> Self {
assert!(!consumer_id.is_empty(), "consumer_id must not be empty");
let consumers_ready = base_name.as_ref().and_then(|name| {
let coordination_name = format!("{}_cr", name);
SharedCursor::attach(&coordination_name).ok()
});
let mut consumer = Self {
ring_buffer,
producer_sequence,
consumer_sequence,
consumer_id,
last_processed_sequence: -1,
consumers_ready,
counters: ConsumerCounters::default(),
};
consumer.last_processed_sequence = consumer.consumer_sequence.load(Ordering::Acquire);
consumer.signal_readiness();
consumer
}
pub fn signal_readiness(&self) {
if let Some(consumers_ready) = &self.consumers_ready {
consumers_ready.fetch_add(1, Ordering::AcqRel);
}
}
pub fn try_attach_coordination(&mut self, base_name: &str) -> bool {
assert!(!base_name.is_empty(), "base_name must not be empty");
if self.consumers_ready.is_some() {
return true; }
let coordination_name = format!("{}_cr", base_name);
if let Ok(cursor) = SharedCursor::attach(&coordination_name) {
self.consumers_ready = Some(cursor);
self.signal_readiness(); return true;
}
false
}
pub fn has_coordination_support(&self) -> bool {
self.consumers_ready.is_some()
}
pub fn attach_counters(&mut self, file: &crate::observability::CountersFile) {
self.attach_counters_selected(file, ConsumerCounterSelection::FULL);
}
pub fn attach_counters_selected(
&mut self,
file: &crate::observability::CountersFile,
selection: ConsumerCounterSelection,
) {
use crate::observability::{ids, COUNTER_FLAG_CONSUMER};
self.counters.events_consumed = if selection.events_consumed {
file.register(
ids::EVENTS_CONSUMED,
COUNTER_FLAG_CONSUMER,
"events_consumed",
)
} else {
None
};
self.counters.consumer_empty_spins = if selection.consumer_empty_spins {
file.register(
ids::CONSUMER_EMPTY_SPINS,
COUNTER_FLAG_CONSUMER,
"consumer_empty_spins",
)
} else {
None
};
self.counters.consumer_lag_max = if selection.consumer_lag_max {
file.register(
ids::CONSUMER_LAG_MAX,
COUNTER_FLAG_CONSUMER,
"consumer_lag_max",
)
} else {
None
};
}
pub fn counters(&self) -> &ConsumerCounters {
&self.counters
}
#[inline]
pub fn record_consume_latency_ns(&self, ns: u64) {
#[cfg(feature = "metrics")]
metrics::histogram!("disruptor_mp_consume_latency_ns").record(ns as f64);
#[cfg(not(feature = "metrics"))]
let _ = ns;
}
pub fn try_consume_next(&mut self) -> Option<(Sequence, E)> {
let Some((next_sequence, upper)) = self.available_batch_bounds() else {
if let Some(h) = &self.counters.consumer_empty_spins {
h.inc();
}
return None;
};
let event_ptr = self.ring_buffer.get(next_sequence);
let event = unsafe { *event_ptr };
self.publish_consumed_sequence(next_sequence);
if let Some(h) = &self.counters.events_consumed {
h.inc();
}
if let Some(h) = &self.counters.consumer_lag_max {
let lag = (upper - next_sequence).max(0) as u64;
h.record_max(lag);
}
Some((next_sequence, event))
}
pub fn try_consume_next_leased(&mut self) -> Option<SharedConsumerLease<'_, E>> {
let Some((next_sequence, upper)) = self.available_batch_bounds() else {
if let Some(h) = &self.counters.consumer_empty_spins {
h.inc();
}
return None;
};
if let Some(h) = &self.counters.consumer_lag_max {
let lag = (upper - next_sequence).max(0) as u64;
h.record_max(lag);
}
let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
Some(SharedConsumerLease {
consumer: self,
sequence: next_sequence,
event_ptr,
})
}
#[inline]
fn available_batch_bounds(&self) -> Option<(Sequence, Sequence)> {
assert!(
self.last_processed_sequence >= -1,
"consumer sequence must not be lower than -1"
);
let producer_seq = self.producer_sequence.load(Ordering::Acquire);
let next_sequence = self.last_processed_sequence + 1;
if next_sequence > producer_seq {
return None;
}
Some((next_sequence, producer_seq))
}
#[inline]
fn publish_consumed_sequence(&mut self, sequence: Sequence) {
self.consumer_sequence.store(sequence, Ordering::Release);
self.last_processed_sequence = sequence;
}
#[inline]
fn is_end_of_batch(&self) -> bool {
self.last_processed_sequence >= self.producer_sequence.load(Ordering::Acquire)
}
#[inline]
fn process_snapshot_batch<F>(
&mut self,
lower: Sequence,
upper: Sequence,
processor: &mut F,
) -> usize
where
F: FnMut(&E, Sequence),
{
let mut processed = 0usize;
if let Some(h) = &self.counters.consumer_lag_max {
let lag = (upper - lower).max(0) as u64;
h.record_max(lag);
}
for sequence in lower..=upper {
let event_ptr = self.ring_buffer.get(sequence);
let event = unsafe { &*event_ptr };
processor(event, sequence);
processed += 1;
}
self.publish_consumed_sequence(upper);
if let Some(h) = &self.counters.events_consumed {
h.add(processed as u64);
}
processed
}
#[inline]
fn process_snapshot_batch_with_eob<F>(
&mut self,
lower: Sequence,
upper: Sequence,
processor: &mut F,
) -> usize
where
F: FnMut(&E, Sequence, bool),
{
let mut processed = 0usize;
if let Some(h) = &self.counters.consumer_lag_max {
let lag = (upper - lower).max(0) as u64;
h.record_max(lag);
}
for sequence in lower..=upper {
let event_ptr = self.ring_buffer.get(sequence);
let event = unsafe { &*event_ptr };
let end_of_batch = sequence == upper;
processor(event, sequence, end_of_batch);
processed += 1;
}
self.publish_consumed_sequence(upper);
if let Some(h) = &self.counters.events_consumed {
h.add(processed as u64);
}
processed
}
pub fn consume_next(&mut self) -> (Sequence, E) {
loop {
if let Some((seq, event)) = self.try_consume_next() {
return (seq, event);
}
#[cfg(dst)]
if crate::dst::buggify(file!(), line!()) {
std::thread::yield_now();
}
std::hint::spin_loop();
}
}
pub fn consume_next_leased(&mut self) -> SharedConsumerLease<'_, E> {
loop {
if let Some((next_sequence, _)) = self.available_batch_bounds() {
let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
return SharedConsumerLease {
consumer: self,
sequence: next_sequence,
event_ptr,
};
}
std::hint::spin_loop();
}
}
pub fn consume_next_with_sleep(&mut self) -> (Sequence, E) {
loop {
if let Some((seq, event)) = self.try_consume_next() {
return (seq, event);
}
super::wait::perform_default_consume_sleep_wait();
}
}
pub fn consume_next_leased_with_sleep(&mut self) -> SharedConsumerLease<'_, E> {
loop {
if let Some((next_sequence, _)) = self.available_batch_bounds() {
let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
return SharedConsumerLease {
consumer: self,
sequence: next_sequence,
event_ptr,
};
}
super::wait::perform_default_consume_sleep_wait();
}
}
pub fn process_next_blocking<F>(&mut self, mut processor: F) -> (Sequence, bool)
where
F: FnMut(&E, Sequence, bool),
{
let (sequence, event) = self.consume_next();
let end_of_batch = self.is_end_of_batch();
processor(&event, sequence, end_of_batch);
(sequence, end_of_batch)
}
pub fn process_next_blocking_with_sleep<F>(&mut self, mut processor: F) -> (Sequence, bool)
where
F: FnMut(&E, Sequence, bool),
{
let (sequence, event) = self.consume_next_with_sleep();
let end_of_batch = self.is_end_of_batch();
processor(&event, sequence, end_of_batch);
(sequence, end_of_batch)
}
pub fn process_available_blocking<F>(&mut self, mut processor: F) -> usize
where
F: FnMut(&E, Sequence, bool),
{
let mut processed = 0usize;
loop {
if let Some((lower, upper)) = self.available_batch_bounds() {
processed += self.process_snapshot_batch_with_eob(lower, upper, &mut processor);
break;
}
std::hint::spin_loop();
}
while let Some((lower, upper)) = self.available_batch_bounds() {
processed += self.process_snapshot_batch_with_eob(lower, upper, &mut processor);
}
processed
}
pub fn process_available<F>(&mut self, mut processor: F) -> usize
where
F: FnMut(&E, Sequence),
{
#[cfg(dst)]
if crate::dst::buggify(file!(), line!()) {
return 0;
}
let mut processed = 0usize;
let mut observed_batch = false;
while let Some((lower, upper)) = self.available_batch_bounds() {
observed_batch = true;
processed += self.process_snapshot_batch(lower, upper, &mut processor);
}
if !observed_batch {
if let Some(h) = &self.counters.consumer_empty_spins {
h.inc();
}
}
processed
}
pub fn current_sequence(&self) -> Sequence {
self.last_processed_sequence
}
pub fn producer_sequence(&self) -> Sequence {
self.producer_sequence.load(Ordering::Acquire)
}
pub fn consumer_sequence(&self) -> Sequence {
self.consumer_sequence.load(Ordering::Acquire)
}
pub fn debug_sequences(&self) -> (Sequence, Sequence, Sequence) {
let producer_seq = self.producer_sequence.load(Ordering::Acquire);
let consumer_seq = self.consumer_sequence.load(Ordering::Acquire);
(self.last_processed_sequence, producer_seq, consumer_seq)
}
pub fn consumer_id(&self) -> &str {
&self.consumer_id
}
}